This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 08a6e9b621311cee0ceedf638d12ac499c870d6f
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Mar 8 21:33:27 2022 -0800

    [C++] Handle exception in creating socket when fd limit is reached (#14587)
    
    (cherry picked from commit babae8e98a172302aee0bb3790b0f4e4128a7c35)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 28 ++++++++++++++++++++--------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index d246bf8..cf12f29 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -161,7 +161,6 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       serverProtocolVersion_(ProtocolVersion_MIN),
       executor_(executor),
       resolver_(executor_->createTcpResolver()),
-      socket_(executor_->createSocket()),
 #if BOOST_VERSION >= 107000
       
strand_(boost::asio::make_strand(executor_->getIOService().get_executor())),
 #elif BOOST_VERSION >= 106600
@@ -173,12 +172,20 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       physicalAddress_(physicalAddress),
       cnxString_("[<none> -> " + physicalAddress + "] "),
       incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
-      
connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(),
-                                                         
clientConfiguration.getConnectionTimeout())),
       outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
-      consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
       
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()) {
 
+    try {
+        socket_ = executor_->createSocket();
+        connectTimeoutTask_ = 
std::make_shared<PeriodicTask>(executor_->getIOService(),
+                                                             
clientConfiguration.getConnectionTimeout());
+        consumerStatsRequestTimer_ = executor_->createDeadlineTimer();
+    } catch (const boost::system::system_error& e) {
+        LOG_ERROR("Failed to initialize connection: " << e.what());
+        close();
+        return;
+    }
+
     LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << 
clientConfiguration.getConnectionTimeout());
     if (clientConfiguration.isUseTls()) {
 #if BOOST_VERSION >= 105400
@@ -1505,9 +1512,11 @@ void ClientConnection::close(Result result) {
     }
     state_ = Disconnected;
     boost::system::error_code err;
-    socket_->close(err);
-    if (err) {
-        LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+    if (socket_) {
+        socket_->close(err);
+        if (err) {
+            LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
+        }
     }
 
     if (tlsSocket_) {
@@ -1542,7 +1551,10 @@ void ClientConnection::close(Result result) {
         consumerStatsRequestTimer_.reset();
     }
 
-    connectTimeoutTask_->stop();
+    if (connectTimeoutTask_) {
+        connectTimeoutTask_->stop();
+        connectTimeoutTask_.reset();
+    }
 
     lock.unlock();
     LOG_INFO(cnxString_ << "Connection closed");

Reply via email to