This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eb7484  [C++] Fixed triggering timers when the connection is being 
closed (#6287)
6eb7484 is described below

commit 6eb74848f8baa5859bf44a1ca8caff1ad891cfb5
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Feb 11 06:42:15 2020 -0800

    [C++] Fixed triggering timers when the connection is being closed (#6287)
    
    * [C++] Fixed triggering timers when the connection is being closed
    
    * Fixed cpp formatting
---
 pulsar-client-cpp/lib/ClientConnection.cc | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index be83370..e40f2f8 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -253,9 +253,10 @@ void ClientConnection::handlePulsarConnected(const 
CommandConnected& cmdConnecte
 
     if (serverProtocolVersion_ >= v1) {
         // Only send keep-alive probes if the broker supports it
-        keepAliveTimer_ = executor_->createDeadlineTimer();
-        
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        
keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout,
 shared_from_this()));
+        DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer();
+        
keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+        
keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, 
shared_from_this()));
+        keepAliveTimer_ = keepAliveTimer;
     }
 
     if (serverProtocolVersion_ >= v8) {
@@ -284,10 +285,14 @@ void 
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
          it != pendingConsumerStatsMap_.end(); ++it) {
         consumerStatsRequests.push_back(it->first);
     }
-    consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
-    
consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
-                                                     shared_from_this(), 
std::placeholders::_1,
-                                                     consumerStatsRequests));
+
+    DeadlineTimerPtr timer = consumerStatsRequestTimer_;
+    if (timer) {
+        timer->expires_from_now(operationsTimeout_);
+        
timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout, 
shared_from_this(),
+                                    std::placeholders::_1, 
consumerStatsRequests));
+    }
+
     lock.unlock();
     // Complex logic since promises need to be fulfilled outside the lock
     for (int i = 0; i < consumerStatsPromises.size(); i++) {

Reply via email to