This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 456dce6  [C++] Fix producer is never destructed until client is closed 
(#14797)
456dce6 is described below

commit 456dce613f4e5cf58f67ff6c92eae050131ea5b6
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 23 14:35:38 2022 +0800

    [C++] Fix producer is never destructed until client is closed (#14797)
    
    Fixes #509
    
    ### Motivation
    
    When a C++ producer is created successfully, it will start a send timer.
    However the callback has captured the shared pointer of `ProducerImpl`
    itself. It extends the lifetime of `ProducerImpl` so that even after the
    `Producer` object destructs, the underlying `ProducerImpl` object won't
    be destructed. It could only be destructed after `Client::close()` is
    called.
    
    ### Modifications
    
    - Pass a weak pointer of `ProducerImpl` to the send timer and add a
      `asyncWaitSendTimeout` method for the combination of
      `expires_from_now` and `async_wait` calls on the timer.
    - Add `ClientTest.testReferenceCount` to verify the reference count will
      become 0 after the producer or consumer destructs.
    
    (cherry picked from commit f7cbc1eb83ffd27b784d90d5d2dea8660c590ad2)
---
 pulsar-client-cpp/lib/ProducerImpl.cc  | 25 ++++++++++++++++---------
 pulsar-client-cpp/lib/ProducerImpl.h   |  2 ++
 pulsar-client-cpp/tests/ClientTest.cc  | 27 +++++++++++++++++++++++++++
 pulsar-client-cpp/tests/PulsarFriend.h |  8 ++++++++
 4 files changed, 53 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index f81e205..e9812d4 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -698,8 +698,8 @@ void ProducerImpl::handleSendTimeout(const 
boost::system::error_code& err) {
     std::shared_ptr<PendingCallbacks> pendingCallbacks;
     if (pendingMessagesQueue_.empty()) {
         // If there are no pending messages, reset the timeout to the 
configured value.
-        sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
         LOG_DEBUG(getName() << "Producer timeout triggered on empty pending 
message queue");
+        asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
     } else {
         // If there is at least one message, calculate the diff between the 
message timeout and
         // the current time.
@@ -709,17 +709,14 @@ void ProducerImpl::handleSendTimeout(const 
boost::system::error_code& err) {
             LOG_DEBUG(getName() << "Timer expired. Calling timeout 
callbacks.");
             pendingCallbacks = getPendingCallbacksWhenFailed();
             // Since the pending queue is cleared now, set timer to expire 
after configured value.
-            sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
+            asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
         } else {
             // The diff is greater than zero, set the timeout to the diff value
             LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new 
timeout " << diff);
-            sendTimer_->expires_from_now(diff);
+            asyncWaitSendTimeout(diff);
         }
     }
 
-    // Asynchronously wait for the timeout to trigger
-    sendTimer_->async_wait(
-        std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), 
std::placeholders::_1));
     lock.unlock();
     if (pendingCallbacks) {
         pendingCallbacks->complete(ResultTimeout);
@@ -885,11 +882,21 @@ void ProducerImpl::startSendTimeoutTimer() {
     // timeout to happen.
     if (!sendTimer_ && conf_.getSendTimeout() > 0) {
         sendTimer_ = executor_->createDeadlineTimer();
-        sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
-        sendTimer_->async_wait(
-            std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), 
std::placeholders::_1));
+        asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
     }
 }
 
+void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
+    sendTimer_->expires_from_now(expiryTime);
+
+    ProducerImplBaseWeakPtr weakSelf = shared_from_this();
+    sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+        auto self = weakSelf.lock();
+        if (self) {
+            
std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
+        }
+    });
+}
+
 }  // namespace pulsar
 /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h 
b/pulsar-client-cpp/lib/ProducerImpl.h
index d29efed..a9eb12b 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -155,6 +155,8 @@ class ProducerImpl : public HandlerBase,
 
     DeadlineTimerPtr sendTimer_;
     void handleSendTimeout(const boost::system::error_code& err);
+    using DurationType = typename boost::asio::deadline_timer::duration_type;
+    void asyncWaitSendTimeout(DurationType expiryTime);
 
     Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
 
diff --git a/pulsar-client-cpp/tests/ClientTest.cc 
b/pulsar-client-cpp/tests/ClientTest.cc
index 8f5e68b..920430d 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 
 #include "HttpHelper.h"
+#include "PulsarFriend.h"
 
 #include <future>
 #include <pulsar/Client.h>
@@ -176,3 +177,29 @@ TEST(ClientTest, testGetNumberOfReferences) {
 
     client.close();
 }
+
+TEST(ClientTest, testReferenceCount) {
+    Client client(lookupUrl);
+    const std::string topic = "client-test-reference-count-" + 
std::to_string(time(nullptr));
+
+    auto &producers = PulsarFriend::getProducers(client);
+    auto &consumers = PulsarFriend::getConsumers(client);
+
+    {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+        ASSERT_EQ(producers.size(), 1);
+        ASSERT_EQ(producers[0].use_count(), 1);
+
+        Consumer consumer;
+        ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
+        ASSERT_EQ(consumers.size(), 1);
+        ASSERT_EQ(consumers[0].use_count(), 1);
+    }
+
+    ASSERT_EQ(producers.size(), 1);
+    ASSERT_EQ(producers[0].use_count(), 0);
+    ASSERT_EQ(consumers.size(), 1);
+    ASSERT_EQ(consumers[0].use_count(), 0);
+    client.close();
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h 
b/pulsar-client-cpp/tests/PulsarFriend.h
index aed7096..74aa1f7 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -89,6 +89,14 @@ class PulsarFriend {
 
     static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { 
return client.impl_; }
 
+    static ClientImpl::ProducersList& getProducers(const Client& client) {
+        return getClientImplPtr(client)->producers_;
+    }
+
+    static ClientImpl::ConsumersList& getConsumers(const Client& client) {
+        return getClientImplPtr(client)->consumers_;
+    }
+
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
         consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
     }

Reply via email to