This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6eb7484 [C++] Fixed triggering timers when the connection is being
closed (#6287)
6eb7484 is described below
commit 6eb74848f8baa5859bf44a1ca8caff1ad891cfb5
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Feb 11 06:42:15 2020 -0800
[C++] Fixed triggering timers when the connection is being closed (#6287)
* [C++] Fixed triggering timers when the connection is being closed
* Fixed cpp formatting
---
pulsar-client-cpp/lib/ClientConnection.cc | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index be83370..e40f2f8 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -253,9 +253,10 @@ void ClientConnection::handlePulsarConnected(const
CommandConnected& cmdConnecte
if (serverProtocolVersion_ >= v1) {
// Only send keep-alive probes if the broker supports it
- keepAliveTimer_ = executor_->createDeadlineTimer();
-
keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-
keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout,
shared_from_this()));
+ DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer();
+
keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+
keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout,
shared_from_this()));
+ keepAliveTimer_ = keepAliveTimer;
}
if (serverProtocolVersion_ >= v8) {
@@ -284,10 +285,14 @@ void
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
it != pendingConsumerStatsMap_.end(); ++it) {
consumerStatsRequests.push_back(it->first);
}
- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
-
consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
- shared_from_this(),
std::placeholders::_1,
- consumerStatsRequests));
+
+ DeadlineTimerPtr timer = consumerStatsRequestTimer_;
+ if (timer) {
+ timer->expires_from_now(operationsTimeout_);
+
timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
shared_from_this(),
+ std::placeholders::_1,
consumerStatsRequests));
+ }
+
lock.unlock();
// Complex logic since promises need to be fulfilled outside the lock
for (int i = 0; i < consumerStatsPromises.size(); i++) {