Copilot commented on code in PR #551:
URL: https://github.com/apache/pulsar-client-cpp/pull/551#discussion_r2905462383


##########
lib/ConnectionPool.cc:
##########
@@ -61,8 +63,18 @@ bool ConnectionPool::close() {
         if (cnx) {
             // The 2nd argument is false because removing a value during the 
iteration will cause segfault
             cnx->close(ResultDisconnected, false);
+            for (int i = 0; i < 5000 && cnx->pendingOperations() > 0; i++) {
+                using namespace std::chrono_literals;
+                std::this_thread::sleep_for(1ms);
+            }

Review Comment:
   `ConnectionPool::close()` holds `mutex_` while waiting for 
`pendingOperations()` to reach 0. If any in-flight async handler calls 
`ClientConnection::close()` with the default `detach=true` (e.g., 
connect/read/write error paths), it will try to acquire the same pool mutex in 
`ConnectionPool::remove()`, which can block the handler and prevent 
`pendingOperations_` from decrementing—leading to a deadlock or always hitting 
the 5s timeout. Consider snapshotting the connections under the lock, then 
releasing the pool lock before closing/waiting, and only re-locking briefly to 
clear the map.



##########
lib/ClientConnection.cc:
##########
@@ -1297,16 +1321,24 @@ void ClientConnection::close(Result result, bool 
detach) {
     if (isClosed()) {
         return;
     }
-    state_ = Disconnected;
 
     if (socket_) {
         ASIO_ERROR err;
-        socket_->shutdown(ASIO::socket_base::shutdown_both, err);
-        socket_->close(err);
-        if (err) {
-            LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
+        if (state_ != Pending) {
+            socket_->shutdown(ASIO::socket_base::shutdown_both, err);
+            socket_->close(err);
+            if (err) {
+                LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
+            }
+        } else {
+            // There is an ongoing connect operation, the socket will be 
closed after the operation succeeds
+            // in `handleTcpConnected`
+            LOG_DEBUG(
+                cnxString_

Review Comment:
   When `state_ == Pending`, `close()` skips closing/shutting down the socket 
and later stops `connectTimeoutTask_`. That can leave the in-flight 
`async_connect` running until the OS-level connect timeout, keeping 
`pendingOperations_` > 0 and allowing `ConnectionPool::close()` / executor 
shutdown to proceed while the connect handler is still outstanding. To ensure 
the connect completes/cancels promptly, consider explicitly cancelling/closing 
the socket (or cancelling the resolver/connect) while keeping the 
socket/executor alive until the completion handler runs.



##########
lib/ClientConnection.h:
##########
@@ -386,8 +404,12 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
     PendingGetSchemaMap pendingGetSchemaRequests_;
 
-    mutable std::mutex mutex_;
-    typedef std::unique_lock<std::mutex> Lock;
+    // Most accesses to this class happen in the event loop thread, but the 
`close` method could be called in
+    // a different thread by `ConnectionPool`, so we have to guard the state 
of the connection with a mutex.
+    // However, `close` could be called when a mutex is held, so we have to 
use a recursive mutex to avoid
+    // deadlock.
+    mutable std::recursive_mutex mutex_;
+    typedef std::unique_lock<std::recursive_mutex> Lock;

Review Comment:
   `ClientConnection.h` now declares `std::recursive_mutex` / 
`std::unique_lock<std::recursive_mutex>` but this header does not include 
`<mutex>`. Relying on transitive includes is brittle and can break builds 
depending on include order; add an explicit `#include <mutex>` in this header.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to