This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 15d5254  [CPP] Fix segment crashes that caused by race condition of 
timer in cpp client (#7572)
15d5254 is described below

commit 15d5254e49b96719638f9efec391a6beeed00bb9
Author: Isaiah Rairdon <[email protected]>
AuthorDate: Tue Jul 28 08:22:08 2020 -0600

    [CPP] Fix segment crashes that caused by race condition of timer in cpp 
client (#7572)
    
    * Kevin Wilson changes to fix segment crashes in pulsar
    
    * after merge with master, change following comments: ptr, unnecessary 
change
    
    * add lock to avoid concurrent access
    
    * Remove comments
    
    Co-authored-by: Isaiah Rairdon <[email protected]>
    Co-authored-by: Jia Zhai <[email protected]>
    Co-authored-by: Sijie Guo <[email protected]>
    Co-authored-by: xiaolong.ran <[email protected]>
---
 pulsar-client-cpp/lib/ClientConnection.cc | 40 +++++++++++++++++++++----------
 1 file changed, 28 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 3628dd5..ebb7268 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -256,10 +256,14 @@ void ClientConnection::handlePulsarConnected(const 
CommandConnected& cmdConnecte
 
     if (serverProtocolVersion_ >= v1) {
         // Only send keep-alive probes if the broker supports it
-        DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer();
-        
keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        
keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, 
shared_from_this()));
-        keepAliveTimer_ = keepAliveTimer;
+        keepAliveTimer_ = executor_->createDeadlineTimer();
+        Lock lock(mutex_);
+        if (keepAliveTimer_) {
+            
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            keepAliveTimer_->async_wait(
+                std::bind(&ClientConnection::handleKeepAliveTimeout, 
shared_from_this()));
+        }
+        lock.unlock();
     }
 
     if (serverProtocolVersion_ >= v8) {
@@ -289,13 +293,14 @@ void 
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
         consumerStatsRequests.push_back(it->first);
     }
 
-    DeadlineTimerPtr timer = consumerStatsRequestTimer_;
-    if (timer) {
-        timer->expires_from_now(operationsTimeout_);
-        
timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout, 
shared_from_this(),
-                                    std::placeholders::_1, 
consumerStatsRequests));
+    // If the close operation has reset the consumerStatsRequestTimer_ then 
the use_count will be zero
+    // Check if we have a timer still before we set the request timer to pop 
again.
+    if (consumerStatsRequestTimer_) {
+        consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
+        
consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
+                                                         shared_from_this(), 
std::placeholders::_1,
+                                                         
consumerStatsRequests));
     }
-
     lock.unlock();
     // Complex logic since promises need to be fulfilled outside the lock
     for (int i = 0; i < consumerStatsPromises.size(); i++) {
@@ -1344,8 +1349,15 @@ void ClientConnection::handleKeepAliveTimeout() {
         havePendingPingRequest_ = true;
         sendCommand(Commands::newPing());
 
-        
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        
keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout,
 shared_from_this()));
+        // If the close operation has already called the 
keepAliveTimer_.reset() then the use_count will be
+        // zero And we do not attempt to dereference the pointer.
+        Lock lock(mutex_);
+        if (keepAliveTimer_) {
+            
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            keepAliveTimer_->async_wait(
+                std::bind(&ClientConnection::handleKeepAliveTimeout, 
shared_from_this()));
+        }
+        lock.unlock();
     }
 }
 
@@ -1375,13 +1387,17 @@ void ClientConnection::close() {
     LOG_INFO(cnxString_ << "Connection closed");
 
     if (keepAliveTimer_) {
+        lock.lock();
         keepAliveTimer_->cancel();
         keepAliveTimer_.reset();
+        lock.unlock();
     }
 
     if (consumerStatsRequestTimer_) {
+        lock.lock();
         consumerStatsRequestTimer_->cancel();
         consumerStatsRequestTimer_.reset();
+        lock.unlock();
     }
 
     for (ProducersMap::iterator it = producers.begin(); it != producers.end(); 
++it) {

Reply via email to