This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f3266034816dc5f102fc1f4becf21bc7706aa899 Author: Matteo Merli <[email protected]> AuthorDate: Tue Mar 8 21:33:27 2022 -0800 [C++] Handle exception in creating socket when fd limit is reached (#14587) (cherry picked from commit babae8e98a172302aee0bb3790b0f4e4128a7c35) --- pulsar-client-cpp/lib/ClientConnection.cc | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 9f4ae6d..3a52191 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -161,7 +161,6 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: serverProtocolVersion_(ProtocolVersion_MIN), executor_(executor), resolver_(executor_->createTcpResolver()), - socket_(executor_->createSocket()), #if BOOST_VERSION >= 107000 strand_(boost::asio::make_strand(executor_->getIOService().get_executor())), #elif BOOST_VERSION >= 106600 @@ -173,12 +172,20 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: physicalAddress_(physicalAddress), cnxString_("[<none> -> " + physicalAddress + "] "), incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), - connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(), - clientConfiguration.getConnectionTimeout())), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), - consumerStatsRequestTimer_(executor_->createDeadlineTimer()), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()) { + try { + socket_ = executor_->createSocket(); + connectTimeoutTask_ = std::make_shared<PeriodicTask>(executor_->getIOService(), + clientConfiguration.getConnectionTimeout()); + consumerStatsRequestTimer_ = executor_->createDeadlineTimer(); + } catch (const boost::system::system_error& e) { + LOG_ERROR("Failed to initialize connection: " << e.what()); + close(); + return; + } + LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout()); if (clientConfiguration.isUseTls()) { #if BOOST_VERSION >= 105400 @@ -1480,9 +1487,11 @@ void ClientConnection::close(Result result) { } state_ = Disconnected; boost::system::error_code err; - socket_->close(err); - if (err) { - LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + if (socket_) { + socket_->close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } } if (tlsSocket_) { @@ -1517,7 +1526,10 @@ void ClientConnection::close(Result result) { consumerStatsRequestTimer_.reset(); } - connectTimeoutTask_->stop(); + if (connectTimeoutTask_) { + connectTimeoutTask_->stop(); + connectTimeoutTask_.reset(); + } lock.unlock(); LOG_INFO(cnxString_ << "Connection closed");
