This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bcb90d2751203301e89a5adbf2a29d71a9073cfa Author: Isaiah Rairdon <[email protected]> AuthorDate: Tue Jul 28 08:22:08 2020 -0600 [CPP] Fix segment crashes that caused by race condition of timer in cpp client (#7572) * Kevin Wilson changes to fix segment crashes in pulsar * after merge with master, change following comments: ptr, unnecessary change * add lock to avoid concurrent access * Remove comments Co-authored-by: Isaiah Rairdon <[email protected]> Co-authored-by: Jia Zhai <[email protected]> Co-authored-by: Sijie Guo <[email protected]> Co-authored-by: xiaolong.ran <[email protected]> (cherry picked from commit 15d5254e49b96719638f9efec391a6beeed00bb9) --- pulsar-client-cpp/lib/ClientConnection.cc | 40 +++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 3628dd5..ebb7268 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -256,10 +256,14 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte if (serverProtocolVersion_ >= v1) { // Only send keep-alive probes if the broker supports it - DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer(); - keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); - keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this())); - keepAliveTimer_ = keepAliveTimer; + keepAliveTimer_ = executor_->createDeadlineTimer(); + Lock lock(mutex_); + if (keepAliveTimer_) { + keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->async_wait( + std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this())); + } + lock.unlock(); } if (serverProtocolVersion_ >= v8) { @@ -289,13 +293,14 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta consumerStatsRequests.push_back(it->first); } - DeadlineTimerPtr timer = consumerStatsRequestTimer_; - if (timer) { - timer->expires_from_now(operationsTimeout_); - timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout, shared_from_this(), - std::placeholders::_1, consumerStatsRequests)); + // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero + // Check if we have a timer still before we set the request timer to pop again. + if (consumerStatsRequestTimer_) { + consumerStatsRequestTimer_->expires_from_now(operationsTimeout_); + consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout, + shared_from_this(), std::placeholders::_1, + consumerStatsRequests)); } - lock.unlock(); // Complex logic since promises need to be fulfilled outside the lock for (int i = 0; i < consumerStatsPromises.size(); i++) { @@ -1344,8 +1349,15 @@ void ClientConnection::handleKeepAliveTimeout() { havePendingPingRequest_ = true; sendCommand(Commands::newPing()); - keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); - keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this())); + // If the close operation has already called the keepAliveTimer_.reset() then the use_count will be + // zero And we do not attempt to dereference the pointer. + Lock lock(mutex_); + if (keepAliveTimer_) { + keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->async_wait( + std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this())); + } + lock.unlock(); } } @@ -1375,13 +1387,17 @@ void ClientConnection::close() { LOG_INFO(cnxString_ << "Connection closed"); if (keepAliveTimer_) { + lock.lock(); keepAliveTimer_->cancel(); keepAliveTimer_.reset(); + lock.unlock(); } if (consumerStatsRequestTimer_) { + lock.lock(); consumerStatsRequestTimer_->cancel(); consumerStatsRequestTimer_.reset(); + lock.unlock(); } for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
