This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 456dce6 [C++] Fix producer is never destructed until client is closed
(#14797)
456dce6 is described below
commit 456dce613f4e5cf58f67ff6c92eae050131ea5b6
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 23 14:35:38 2022 +0800
[C++] Fix producer is never destructed until client is closed (#14797)
Fixes #509
### Motivation
When a C++ producer is created successfully, it will start a send timer.
However the callback has captured the shared pointer of `ProducerImpl`
itself. It extends the lifetime of `ProducerImpl` so that even after the
`Producer` object destructs, the underlying `ProducerImpl` object won't
be destructed. It could only be destructed after `Client::close()` is
called.
### Modifications
- Pass a weak pointer of `ProducerImpl` to the send timer and add a
`asyncWaitSendTimeout` method for the combination of
`expires_from_now` and `async_wait` calls on the timer.
- Add `ClientTest.testReferenceCount` to verify the reference count will
become 0 after the producer or consumer destructs.
(cherry picked from commit f7cbc1eb83ffd27b784d90d5d2dea8660c590ad2)
---
pulsar-client-cpp/lib/ProducerImpl.cc | 25 ++++++++++++++++---------
pulsar-client-cpp/lib/ProducerImpl.h | 2 ++
pulsar-client-cpp/tests/ClientTest.cc | 27 +++++++++++++++++++++++++++
pulsar-client-cpp/tests/PulsarFriend.h | 8 ++++++++
4 files changed, 53 insertions(+), 9 deletions(-)
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc
b/pulsar-client-cpp/lib/ProducerImpl.cc
index f81e205..e9812d4 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -698,8 +698,8 @@ void ProducerImpl::handleSendTimeout(const
boost::system::error_code& err) {
std::shared_ptr<PendingCallbacks> pendingCallbacks;
if (pendingMessagesQueue_.empty()) {
// If there are no pending messages, reset the timeout to the
configured value.
- sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
LOG_DEBUG(getName() << "Producer timeout triggered on empty pending
message queue");
+ asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
} else {
// If there is at least one message, calculate the diff between the
message timeout and
// the current time.
@@ -709,17 +709,14 @@ void ProducerImpl::handleSendTimeout(const
boost::system::error_code& err) {
LOG_DEBUG(getName() << "Timer expired. Calling timeout
callbacks.");
pendingCallbacks = getPendingCallbacksWhenFailed();
// Since the pending queue is cleared now, set timer to expire
after configured value.
- sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
+ asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
} else {
// The diff is greater than zero, set the timeout to the diff value
LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new
timeout " << diff);
- sendTimer_->expires_from_now(diff);
+ asyncWaitSendTimeout(diff);
}
}
- // Asynchronously wait for the timeout to trigger
- sendTimer_->async_wait(
- std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(),
std::placeholders::_1));
lock.unlock();
if (pendingCallbacks) {
pendingCallbacks->complete(ResultTimeout);
@@ -885,11 +882,21 @@ void ProducerImpl::startSendTimeoutTimer() {
// timeout to happen.
if (!sendTimer_ && conf_.getSendTimeout() > 0) {
sendTimer_ = executor_->createDeadlineTimer();
- sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
- sendTimer_->async_wait(
- std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(),
std::placeholders::_1));
+ asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
}
}
+void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
+ sendTimer_->expires_from_now(expiryTime);
+
+ ProducerImplBaseWeakPtr weakSelf = shared_from_this();
+ sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+ auto self = weakSelf.lock();
+ if (self) {
+
std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
+ }
+ });
+}
+
} // namespace pulsar
/* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h
b/pulsar-client-cpp/lib/ProducerImpl.h
index d29efed..a9eb12b 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -155,6 +155,8 @@ class ProducerImpl : public HandlerBase,
DeadlineTimerPtr sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
+ using DurationType = typename boost::asio::deadline_timer::duration_type;
+ void asyncWaitSendTimeout(DurationType expiryTime);
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
diff --git a/pulsar-client-cpp/tests/ClientTest.cc
b/pulsar-client-cpp/tests/ClientTest.cc
index 8f5e68b..920430d 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include "HttpHelper.h"
+#include "PulsarFriend.h"
#include <future>
#include <pulsar/Client.h>
@@ -176,3 +177,29 @@ TEST(ClientTest, testGetNumberOfReferences) {
client.close();
}
+
+TEST(ClientTest, testReferenceCount) {
+ Client client(lookupUrl);
+ const std::string topic = "client-test-reference-count-" +
std::to_string(time(nullptr));
+
+ auto &producers = PulsarFriend::getProducers(client);
+ auto &consumers = PulsarFriend::getConsumers(client);
+
+ {
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(producers.size(), 1);
+ ASSERT_EQ(producers[0].use_count(), 1);
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(consumers[0].use_count(), 1);
+ }
+
+ ASSERT_EQ(producers.size(), 1);
+ ASSERT_EQ(producers[0].use_count(), 0);
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(consumers[0].use_count(), 0);
+ client.close();
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h
b/pulsar-client-cpp/tests/PulsarFriend.h
index aed7096..74aa1f7 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -89,6 +89,14 @@ class PulsarFriend {
static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) {
return client.impl_; }
+ static ClientImpl::ProducersList& getProducers(const Client& client) {
+ return getClientImplPtr(client)->producers_;
+ }
+
+ static ClientImpl::ConsumersList& getConsumers(const Client& client) {
+ return getClientImplPtr(client)->consumers_;
+ }
+
static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
}