This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 08a6e9b621311cee0ceedf638d12ac499c870d6f Author: Matteo Merli <[email protected]> AuthorDate: Tue Mar 8 21:33:27 2022 -0800 [C++] Handle exception in creating socket when fd limit is reached (#14587) (cherry picked from commit babae8e98a172302aee0bb3790b0f4e4128a7c35) --- pulsar-client-cpp/lib/ClientConnection.cc | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index d246bf8..cf12f29 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -161,7 +161,6 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: serverProtocolVersion_(ProtocolVersion_MIN), executor_(executor), resolver_(executor_->createTcpResolver()), - socket_(executor_->createSocket()), #if BOOST_VERSION >= 107000 strand_(boost::asio::make_strand(executor_->getIOService().get_executor())), #elif BOOST_VERSION >= 106600 @@ -173,12 +172,20 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: physicalAddress_(physicalAddress), cnxString_("[<none> -> " + physicalAddress + "] "), incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), - connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(), - clientConfiguration.getConnectionTimeout())), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), - consumerStatsRequestTimer_(executor_->createDeadlineTimer()), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()) { + try { + socket_ = executor_->createSocket(); + connectTimeoutTask_ = std::make_shared<PeriodicTask>(executor_->getIOService(), + clientConfiguration.getConnectionTimeout()); + consumerStatsRequestTimer_ = executor_->createDeadlineTimer(); + } catch (const boost::system::system_error& e) { + LOG_ERROR("Failed to initialize connection: " << e.what()); + close(); + return; + } + LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout()); if (clientConfiguration.isUseTls()) { #if BOOST_VERSION >= 105400 @@ -1505,9 +1512,11 @@ void ClientConnection::close(Result result) { } state_ = Disconnected; boost::system::error_code err; - socket_->close(err); - if (err) { - LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + if (socket_) { + socket_->close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } } if (tlsSocket_) { @@ -1542,7 +1551,10 @@ void ClientConnection::close(Result result) { consumerStatsRequestTimer_.reset(); } - connectTimeoutTask_->stop(); + if (connectTimeoutTask_) { + connectTimeoutTask_->stop(); + connectTimeoutTask_.reset(); + } lock.unlock(); LOG_INFO(cnxString_ << "Connection closed");
