This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15d5254 [CPP] Fix segment crashes that caused by race condition of
timer in cpp client (#7572)
15d5254 is described below
commit 15d5254e49b96719638f9efec391a6beeed00bb9
Author: Isaiah Rairdon <[email protected]>
AuthorDate: Tue Jul 28 08:22:08 2020 -0600
[CPP] Fix segment crashes that caused by race condition of timer in cpp
client (#7572)
* Kevin Wilson changes to fix segment crashes in pulsar
* after merge with master, change following comments: ptr, unnecessary
change
* add lock to avoid concurrent access
* Remove comments
Co-authored-by: Isaiah Rairdon <[email protected]>
Co-authored-by: Jia Zhai <[email protected]>
Co-authored-by: Sijie Guo <[email protected]>
Co-authored-by: xiaolong.ran <[email protected]>
---
pulsar-client-cpp/lib/ClientConnection.cc | 40 +++++++++++++++++++++----------
1 file changed, 28 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index 3628dd5..ebb7268 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -256,10 +256,14 @@ void ClientConnection::handlePulsarConnected(const
CommandConnected& cmdConnecte
if (serverProtocolVersion_ >= v1) {
// Only send keep-alive probes if the broker supports it
- DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer();
-
keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-
keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout,
shared_from_this()));
- keepAliveTimer_ = keepAliveTimer;
+ keepAliveTimer_ = executor_->createDeadlineTimer();
+ Lock lock(mutex_);
+ if (keepAliveTimer_) {
+
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+ keepAliveTimer_->async_wait(
+ std::bind(&ClientConnection::handleKeepAliveTimeout,
shared_from_this()));
+ }
+ lock.unlock();
}
if (serverProtocolVersion_ >= v8) {
@@ -289,13 +293,14 @@ void
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
consumerStatsRequests.push_back(it->first);
}
- DeadlineTimerPtr timer = consumerStatsRequestTimer_;
- if (timer) {
- timer->expires_from_now(operationsTimeout_);
-
timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
shared_from_this(),
- std::placeholders::_1,
consumerStatsRequests));
+ // If the close operation has reset the consumerStatsRequestTimer_ then
the use_count will be zero
+ // Check if we have a timer still before we set the request timer to pop
again.
+ if (consumerStatsRequestTimer_) {
+ consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
+
consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
+ shared_from_this(),
std::placeholders::_1,
+
consumerStatsRequests));
}
-
lock.unlock();
// Complex logic since promises need to be fulfilled outside the lock
for (int i = 0; i < consumerStatsPromises.size(); i++) {
@@ -1344,8 +1349,15 @@ void ClientConnection::handleKeepAliveTimeout() {
havePendingPingRequest_ = true;
sendCommand(Commands::newPing());
-
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-
keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout,
shared_from_this()));
+ // If the close operation has already called the
keepAliveTimer_.reset() then the use_count will be
+ // zero And we do not attempt to dereference the pointer.
+ Lock lock(mutex_);
+ if (keepAliveTimer_) {
+
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+ keepAliveTimer_->async_wait(
+ std::bind(&ClientConnection::handleKeepAliveTimeout,
shared_from_this()));
+ }
+ lock.unlock();
}
}
@@ -1375,13 +1387,17 @@ void ClientConnection::close() {
LOG_INFO(cnxString_ << "Connection closed");
if (keepAliveTimer_) {
+ lock.lock();
keepAliveTimer_->cancel();
keepAliveTimer_.reset();
+ lock.unlock();
}
if (consumerStatsRequestTimer_) {
+ lock.lock();
consumerStatsRequestTimer_->cancel();
consumerStatsRequestTimer_.reset();
+ lock.unlock();
}
for (ProducersMap::iterator it = producers.begin(); it != producers.end();
++it) {