This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
commit 9a4b0d17d618c540fa6773484d540150369881ad Author: erobot <[email protected]> AuthorDate: Sat Dec 3 02:29:44 2022 +0800 [fix] Fix a crash when closing a connection while connecting (#136) --- lib/ClientConnection.cc | 20 +++++++++++++++++--- tests/ClientTest.cc | 15 +++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 48b88b5..5c14467 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -279,23 +279,30 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC LOG_DEBUG("Current max message size is: " << maxMessageSize_); } + Lock lock(mutex_); + + if (isClosed()) { + LOG_INFO(cnxString_ << "Connection already closed"); + return; + } state_ = Ready; connectTimeoutTask_->stop(); serverProtocolVersion_ = cmdConnected.protocol_version(); - connectPromise_.setValue(shared_from_this()); if (serverProtocolVersion_ >= proto::v1) { // Only send keep-alive probes if the broker supports it keepAliveTimer_ = executor_->createDeadlineTimer(); - Lock lock(mutex_); if (keepAliveTimer_) { keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); keepAliveTimer_->async_wait( std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this())); } - lock.unlock(); } + lock.unlock(); + + connectPromise_.setValue(shared_from_this()); + if (serverProtocolVersion_ >= proto::v8) { startConsumerStatsTimer(std::vector<uint64_t>()); } @@ -380,7 +387,14 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } else { LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_); } + + Lock lock(mutex_); + if (isClosed()) { + LOG_INFO(cnxString_ << "Connection already closed"); + return; + } state_ = TcpConnected; + lock.unlock(); boost::system::error_code error; socket_->set_option(tcp::no_delay(true), error); diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index bdb0555..a4abc3a 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -19,6 +19,7 @@ #include <gtest/gtest.h> #include <pulsar/Client.h> +#include <chrono> #include <future> #include "HttpHelper.h" @@ -293,3 +294,17 @@ TEST(ClientTest, testMultiBrokerUrl) { ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); client.close(); } + +TEST(ClientTest, testCloseClient) { + const std::string topic = "client-test-close-client-" + std::to_string(time(nullptr)); + + for (int i = 0; i < 1000; ++i) { + Client client(lookupUrl); + client.createProducerAsync(topic, [](Result result, Producer producer) { producer.close(); }); + // simulate different time interval before close + auto t0 = std::chrono::steady_clock::now(); + while ((std::chrono::steady_clock::now() - t0) < std::chrono::microseconds(i)) { + } + client.close(); + } +}
