This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new af45a54 Fix the handler instance is expired when the connection is
established (#323)
af45a54 is described below
commit af45a54c10ec5b06e80b683010afd3531457ac64
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Oct 5 00:23:25 2023 +0800
Fix the handler instance is expired when the connection is established
(#323)
### Motivation
We observed some logs that showed the handler instance is expired when
the connection is established after a reconnection:
```
HandlerBase Weak reference is not valid anymore
```
https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/HandlerBase.cc#L92
### Modifications
Pass a `shared_ptr` instead of a `weak_ptr` in `HandlerBase::grabCnx` to
ensure the `connectionOpened` or `connectionFailed` callback is called
if `ClientImpl::getConnection` is called. We only need to pass a
`weak_ptr` in `scheduleReconnection` to skip the reconnection if the
handler is expired.
---
lib/ConsumerImplBase.h | 6 ++++--
lib/HandlerBase.cc | 38 +++++++++++++++++---------------------
lib/HandlerBase.h | 7 +------
lib/ProducerImpl.cc | 2 --
lib/ProducerImpl.h | 12 ++++++------
5 files changed, 28 insertions(+), 37 deletions(-)
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 5116fd0..e3f9387 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -40,11 +40,14 @@ class OpBatchReceive {
const int64_t createAt_;
};
-class ConsumerImplBase : public HandlerBase, public
std::enable_shared_from_this<ConsumerImplBase> {
+class ConsumerImplBase : public HandlerBase {
public:
virtual ~ConsumerImplBase(){};
ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff
backoff,
const ConsumerConfiguration& conf, ExecutorServicePtr
listenerExecutor);
+ std::shared_ptr<ConsumerImplBase> shared_from_this() noexcept {
+ return
std::dynamic_pointer_cast<ConsumerImplBase>(HandlerBase::shared_from_this());
+ }
// interface by consumer
virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture()
= 0;
@@ -83,7 +86,6 @@ class ConsumerImplBase : public HandlerBase, public
std::enable_shared_from_this
// overrided methods from HandlerBase
void connectionOpened(const ClientConnectionPtr& cnx) override {}
void connectionFailed(Result result) override {}
- HandlerBaseWeakPtr get_weak_from_this() override { return
shared_from_this(); }
// consumer impl generic method.
ExecutorServicePtr listenerExecutor_;
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 7c2d1ee..163c779 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -84,25 +84,18 @@ void HandlerBase::grabCnx() {
connectionFailed(ResultConnectError);
return;
}
- auto weakSelf = get_weak_from_this();
- client->getConnection(*topic_).addListener(
- [this, weakSelf](Result result, const ClientConnectionPtr& cnx) {
- auto self = weakSelf.lock();
- if (!self) {
- LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
- return;
- }
-
- reconnectionPending_ = false;
-
- if (result == ResultOk) {
- LOG_DEBUG(getName() << "Connected to broker: " <<
cnx->cnxString());
- connectionOpened(cnx);
- } else {
- connectionFailed(result);
- scheduleReconnection();
- }
- });
+ auto self = shared_from_this();
+ client->getConnection(*topic_).addListener([this, self](Result result,
const ClientConnectionPtr& cnx) {
+ reconnectionPending_ = false;
+
+ if (result == ResultOk) {
+ LOG_DEBUG(getName() << "Connected to broker: " <<
cnx->cnxString());
+ connectionOpened(cnx);
+ } else {
+ connectionFailed(result);
+ scheduleReconnection();
+ }
+ });
}
void HandlerBase::handleDisconnection(Result result, const
ClientConnectionPtr& cnx) {
@@ -148,11 +141,14 @@ void HandlerBase::scheduleReconnection() {
timer_->expires_from_now(delay);
// passing shared_ptr here since time_ will get destroyed, so tasks
will be cancelled
// so we will not run into the case where grabCnx is invoked on out of
scope handler
- auto weakSelf = get_weak_from_this();
- timer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+ auto name = getName();
+ std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
+ timer_->async_wait([name, weakSelf](const boost::system::error_code&
ec) {
auto self = weakSelf.lock();
if (self) {
self->handleTimeout(ec);
+ } else {
+ LOG_WARN(name << "Cancel the reconnection since the handler is
destroyed");
}
});
}
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 2cd8dd1..71eaafe 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -33,9 +33,6 @@ using namespace boost::posix_time;
using boost::posix_time::milliseconds;
using boost::posix_time::seconds;
-class HandlerBase;
-typedef std::weak_ptr<HandlerBase> HandlerBaseWeakPtr;
-typedef std::shared_ptr<HandlerBase> HandlerBasePtr;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
@@ -46,7 +43,7 @@ class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
-class HandlerBase {
+class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
public:
HandlerBase(const ClientImplPtr&, const std::string&, const Backoff&);
@@ -84,8 +81,6 @@ class HandlerBase {
virtual void connectionFailed(Result result) = 0;
- virtual HandlerBaseWeakPtr get_weak_from_this() = 0;
-
virtual const std::string& getName() const = 0;
private:
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index bdcc757..6b3f5cb 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -1010,7 +1010,5 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType
expiryTime) {
});
}
-ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return
shared_from_this(); }
-
} // namespace pulsar
/* namespace pulsar */
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 8611bfe..aa4ba35 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -60,9 +60,7 @@ namespace proto {
class MessageMetadata;
} // namespace proto
-class ProducerImpl : public HandlerBase,
- public std::enable_shared_from_this<ProducerImpl>,
- public ProducerImplBase {
+class ProducerImpl : public HandlerBase, public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const TopicName& topic,
const ProducerConfiguration& producerConfiguration,
@@ -98,8 +96,11 @@ class ProducerImpl : public HandlerBase,
static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);
- // NOTE: this method is introduced into `enable_shared_from_this` since
C++17
- ProducerImplWeakPtr weak_from_this() noexcept;
+ ProducerImplPtr shared_from_this() noexcept {
+ return
std::dynamic_pointer_cast<ProducerImpl>(HandlerBase::shared_from_this());
+ }
+
+ ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this();
}
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
@@ -121,7 +122,6 @@ class ProducerImpl : public HandlerBase,
void beforeConnectionChange(ClientConnection& connection) override;
void connectionOpened(const ClientConnectionPtr& connection) override;
void connectionFailed(Result result) override;
- HandlerBaseWeakPtr get_weak_from_this() override { return
shared_from_this(); }
const std::string& getName() const override { return producerStr_; }
private: