This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new af45a54  Fix the handler instance is expired when the connection is 
established (#323)
af45a54 is described below

commit af45a54c10ec5b06e80b683010afd3531457ac64
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Oct 5 00:23:25 2023 +0800

    Fix the handler instance is expired when the connection is established 
(#323)
    
    ### Motivation
    
    We observed some logs that showed the handler instance is expired when
    the connection is established after a reconnection:
    
    ```
    HandlerBase Weak reference is not valid anymore
    ```
    
    
https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/HandlerBase.cc#L92
    
    ### Modifications
    
    Pass a `shared_ptr` instead of a `weak_ptr` in `HandlerBase::grabCnx` to
    ensure the `connectionOpened` or `connectionFailed` callback is called
    if `ClientImpl::getConnection` is called. We only need to pass a
    `weak_ptr` in `scheduleReconnection` to skip the reconnection if the
    handler is expired.
---
 lib/ConsumerImplBase.h |  6 ++++--
 lib/HandlerBase.cc     | 38 +++++++++++++++++---------------------
 lib/HandlerBase.h      |  7 +------
 lib/ProducerImpl.cc    |  2 --
 lib/ProducerImpl.h     | 12 ++++++------
 5 files changed, 28 insertions(+), 37 deletions(-)

diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 5116fd0..e3f9387 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -40,11 +40,14 @@ class OpBatchReceive {
     const int64_t createAt_;
 };
 
-class ConsumerImplBase : public HandlerBase, public 
std::enable_shared_from_this<ConsumerImplBase> {
+class ConsumerImplBase : public HandlerBase {
    public:
     virtual ~ConsumerImplBase(){};
     ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff 
backoff,
                      const ConsumerConfiguration& conf, ExecutorServicePtr 
listenerExecutor);
+    std::shared_ptr<ConsumerImplBase> shared_from_this() noexcept {
+        return 
std::dynamic_pointer_cast<ConsumerImplBase>(HandlerBase::shared_from_this());
+    }
 
     // interface by consumer
     virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() 
= 0;
@@ -83,7 +86,6 @@ class ConsumerImplBase : public HandlerBase, public 
std::enable_shared_from_this
     // overrided methods from HandlerBase
     void connectionOpened(const ClientConnectionPtr& cnx) override {}
     void connectionFailed(Result result) override {}
-    HandlerBaseWeakPtr get_weak_from_this() override { return 
shared_from_this(); }
 
     // consumer impl generic method.
     ExecutorServicePtr listenerExecutor_;
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 7c2d1ee..163c779 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -84,25 +84,18 @@ void HandlerBase::grabCnx() {
         connectionFailed(ResultConnectError);
         return;
     }
-    auto weakSelf = get_weak_from_this();
-    client->getConnection(*topic_).addListener(
-        [this, weakSelf](Result result, const ClientConnectionPtr& cnx) {
-            auto self = weakSelf.lock();
-            if (!self) {
-                LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
-                return;
-            }
-
-            reconnectionPending_ = false;
-
-            if (result == ResultOk) {
-                LOG_DEBUG(getName() << "Connected to broker: " << 
cnx->cnxString());
-                connectionOpened(cnx);
-            } else {
-                connectionFailed(result);
-                scheduleReconnection();
-            }
-        });
+    auto self = shared_from_this();
+    client->getConnection(*topic_).addListener([this, self](Result result, 
const ClientConnectionPtr& cnx) {
+        reconnectionPending_ = false;
+
+        if (result == ResultOk) {
+            LOG_DEBUG(getName() << "Connected to broker: " << 
cnx->cnxString());
+            connectionOpened(cnx);
+        } else {
+            connectionFailed(result);
+            scheduleReconnection();
+        }
+    });
 }
 
 void HandlerBase::handleDisconnection(Result result, const 
ClientConnectionPtr& cnx) {
@@ -148,11 +141,14 @@ void HandlerBase::scheduleReconnection() {
         timer_->expires_from_now(delay);
         // passing shared_ptr here since time_ will get destroyed, so tasks 
will be cancelled
         // so we will not run into the case where grabCnx is invoked on out of 
scope handler
-        auto weakSelf = get_weak_from_this();
-        timer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+        auto name = getName();
+        std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
+        timer_->async_wait([name, weakSelf](const boost::system::error_code& 
ec) {
             auto self = weakSelf.lock();
             if (self) {
                 self->handleTimeout(ec);
+            } else {
+                LOG_WARN(name << "Cancel the reconnection since the handler is 
destroyed");
             }
         });
     }
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 2cd8dd1..71eaafe 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -33,9 +33,6 @@ using namespace boost::posix_time;
 using boost::posix_time::milliseconds;
 using boost::posix_time::seconds;
 
-class HandlerBase;
-typedef std::weak_ptr<HandlerBase> HandlerBaseWeakPtr;
-typedef std::shared_ptr<HandlerBase> HandlerBasePtr;
 class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
 using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
@@ -46,7 +43,7 @@ class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
 using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
 
-class HandlerBase {
+class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
    public:
     HandlerBase(const ClientImplPtr&, const std::string&, const Backoff&);
 
@@ -84,8 +81,6 @@ class HandlerBase {
 
     virtual void connectionFailed(Result result) = 0;
 
-    virtual HandlerBaseWeakPtr get_weak_from_this() = 0;
-
     virtual const std::string& getName() const = 0;
 
    private:
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index bdcc757..6b3f5cb 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -1010,7 +1010,5 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType 
expiryTime) {
     });
 }
 
-ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return 
shared_from_this(); }
-
 }  // namespace pulsar
 /* namespace pulsar */
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 8611bfe..aa4ba35 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -60,9 +60,7 @@ namespace proto {
 class MessageMetadata;
 }  // namespace proto
 
-class ProducerImpl : public HandlerBase,
-                     public std::enable_shared_from_this<ProducerImpl>,
-                     public ProducerImplBase {
+class ProducerImpl : public HandlerBase, public ProducerImplBase {
    public:
     ProducerImpl(ClientImplPtr client, const TopicName& topic,
                  const ProducerConfiguration& producerConfiguration,
@@ -98,8 +96,11 @@ class ProducerImpl : public HandlerBase,
 
     static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);
 
-    // NOTE: this method is introduced into `enable_shared_from_this` since 
C++17
-    ProducerImplWeakPtr weak_from_this() noexcept;
+    ProducerImplPtr shared_from_this() noexcept {
+        return 
std::dynamic_pointer_cast<ProducerImpl>(HandlerBase::shared_from_this());
+    }
+
+    ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); 
}
 
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
@@ -121,7 +122,6 @@ class ProducerImpl : public HandlerBase,
     void beforeConnectionChange(ClientConnection& connection) override;
     void connectionOpened(const ClientConnectionPtr& connection) override;
     void connectionFailed(Result result) override;
-    HandlerBaseWeakPtr get_weak_from_this() override { return 
shared_from_this(); }
     const std::string& getName() const override { return producerStr_; }
 
    private:

Reply via email to