Copilot commented on code in PR #551: URL: https://github.com/apache/pulsar-client-cpp/pull/551#discussion_r2922515306
########## tests/ClientTest.cc: ########## @@ -42,6 +46,81 @@ using testing::AtLeast; static std::string lookupUrl = "pulsar://localhost:6650"; static std::string adminUrl = "http://localhost:8080/"; +namespace { + +class SilentTcpServer { + public: + SilentTcpServer() + : acceptor_(ioContext_, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), 0)), + acceptedFuture_(acceptedPromise_.get_future()) {} + + ~SilentTcpServer() { stop(); } + + int getPort() const { return acceptor_.local_endpoint().port(); } + + void start() { + serverThread_ = std::thread([this] { + socket_.reset(new ASIO::ip::tcp::socket(ioContext_)); + + ASIO_ERROR acceptError; + acceptor_.accept(*socket_, acceptError); + acceptedPromise_.set_value(acceptError); + + std::unique_lock<std::mutex> lock(mutex_); + cond_.wait(lock, [this] { return stopped_; }); + lock.unlock(); + + if (socket_) { + ASIO_ERROR closeError; + socket_->close(closeError); + } + + ASIO_ERROR closeError; + acceptor_.close(closeError); + }); + } + + bool waitUntilAccepted(std::chrono::milliseconds timeout) const { + return acceptedFuture_.wait_for(timeout) == std::future_status::ready; + } + + ASIO_ERROR acceptedError() const { return acceptedFuture_.get(); } + + void stop() { + { + std::lock_guard<std::mutex> lock(mutex_); + if (stopped_) { + return; + } + stopped_ = true; + } + + ASIO_ERROR closeError; + acceptor_.close(closeError); + if (socket_) { + socket_->close(closeError); + } Review Comment: `SilentTcpServer::stop()` calls `acceptor_.close()` / `socket_->close()` from a different thread while the server thread may be blocked in `acceptor_.accept()` (and later also closes the same acceptor). Asio objects are not safe as shared objects, so this cross-thread close can itself be racy/flaky. Consider running accept/close on the same thread (e.g., use `async_accept` + `io_context::run()` in `serverThread_`, and `ASIO::post`/`dispatch` a shutdown that closes the acceptor/socket), or otherwise ensure only the server thread touches the acceptor/socket. ########## lib/ExecutorService.h: ########## @@ -62,7 +66,19 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut TcpResolverPtr createTcpResolver(); // throws std::runtime_error if failed DeadlineTimerPtr createDeadlineTimer(); - void postWork(std::function<void(void)> task); + + // Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop + // queue and executed later. + template <typename T> + void postWork(T &&task) { + ASIO::post(io_context_, std::forward<T>(task)); + } + + // Different from `postWork`, if it's already in the event loop, execute the task immediately + template <typename T> + void dispatch(T &&task) { + ASIO::dispatch(io_context_, std::forward<T>(task)); + } Review Comment: `postWork()` / `dispatch()` use `std::forward`, but this header does not include `<utility>`. Relying on transitive includes can break builds on some toolchains/standard library combinations. Add `#include <utility>` (and similarly anywhere else `std::forward` is used) to make the header self-contained. ########## lib/ExecutorService.h: ########## @@ -62,7 +66,19 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut TcpResolverPtr createTcpResolver(); // throws std::runtime_error if failed DeadlineTimerPtr createDeadlineTimer(); - void postWork(std::function<void(void)> task); + + // Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop + // queue and executed later. + template <typename T> + void postWork(T &&task) { + ASIO::post(io_context_, std::forward<T>(task)); + } Review Comment: `ExecutorService::postWork` was changed from a non-template member (with a compiled symbol) to a templated inline function. Since `ExecutorService` is marked `PULSAR_PUBLIC`, removing the old overload can break ABI for downstream code linking against a previously-built shared library. Consider keeping the original `void postWork(std::function<void()>)` as an overload (it can forward to `ASIO::post`) while adding the templated version for flexibility. ########## lib/ClientConnection.h: ########## @@ -297,26 +319,26 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien } template <typename ConstBufferSequence, typename WriteHandler> - inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler handler) { + inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler&& handler) { if (isClosed()) { return; } if (tlsSocket_) { - ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, handler)); + ASIO::async_write(*tlsSocket_, buffers, std::forward<WriteHandler>(handler)); } else { ASIO::async_write(*socket_, buffers, handler); } Review Comment: `asyncWrite()` takes `WriteHandler&&` but the non-TLS branch passes `handler` as an lvalue (`ASIO::async_write(*socket_, ..., handler)`), which forces a copy and will fail for move-only handlers. Use `std::forward<WriteHandler>(handler)` in both branches. Also, since this uses `std::forward`, ensure this header includes `<utility>` instead of relying on transitive includes. ########## lib/ConnectionPool.cc: ########## @@ -54,16 +54,43 @@ bool ConnectionPool::close() { return false; } + std::vector<ClientConnectionPtr> connectionsToClose; + // ClientConnection::close() will remove the connection from the pool, which is not allowed when iterating + // over a map, so we store the connections to close in a vector first and don't iterate the pool when + // closing the connections. std::unique_lock<std::recursive_mutex> lock(mutex_); + connectionsToClose.reserve(pool_.size()); + for (auto&& kv : pool_) { + connectionsToClose.emplace_back(kv.second); + } + pool_.clear(); + lock.unlock(); - for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { - auto& cnx = cnxIt->second; + for (auto&& cnx : connectionsToClose) { if (cnx) { - // The 2nd argument is false because removing a value during the iteration will cause segfault - cnx->close(ResultDisconnected, false); + // Close with a fatal error to not let client retry + auto& future = cnx->close(ResultAlreadyClosed); + using namespace std::chrono_literals; + if (auto status = future.wait_for(5s); status != std::future_status::ready) { + LOG_WARN("Connection close timed out for " << cnx.get()->cnxString()); + } + if (cnx.use_count() > 1) { + // There are some asynchronous operations that hold the reference on the connection, we should + // wait until them to finish. Otherwise, `io_context::stop()` will be called in + // `ClientImpl::shutdown()` when closing the `ExecutorServiceProvider`. Then + // `io_context::run()` will return and the `io_context` object will be destroyed. In this + // case, if there is any pending handler, it will crash. + for (int i = 0; i < 500 && cnx.use_count() > 1; i++) { + std::this_thread::sleep_for(10ms); + } + if (cnx.use_count() > 1) { + LOG_WARN("Connection still has " << (cnx.use_count() - 1) + << " references after waiting for 5 seconds for " + << cnx.get()->cnxString()); + } Review Comment: `ConnectionPool::close()` waits for references to drop by polling `cnx.use_count()` with `sleep_for(10ms)` up to 5 seconds per connection. With multiple connections this can make shutdown take O(N * 5s) in the worst case and still isn’t a precise indicator of “no pending handlers” (other long-lived refs may keep the count > 1). Consider replacing this with an explicit “pending async ops” counter / completion signal in `ClientConnection`, or applying a single global time budget across all connections rather than per-connection busy-waiting. ########## lib/ClientConnection.cc: ########## @@ -581,73 +576,68 @@ void ClientConnection::tcpConnectAsync() { Url service_url; std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_; if (!Url::parse(hostUrl, service_url)) { - LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); + LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " " << err.message()); close(); return; } if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") { - LOG_ERROR(cnxString_ << "Invalid Url protocol '" << service_url.protocol() - << "'. Valid values are 'pulsar' and 'pulsar+ssl'"); + LOG_ERROR(cnxString() << "Invalid Url protocol '" << service_url.protocol() + << "'. Valid values are 'pulsar' and 'pulsar+ssl'"); close(); return; } - LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); + LOG_DEBUG(cnxString() << "Resolving " << service_url.host() << ":" << service_url.port()); - auto weakSelf = weak_from_this(); - resolver_->async_resolve(service_url.host(), std::to_string(service_url.port()), - [weakSelf](auto err, const auto& results) { - auto self = weakSelf.lock(); - if (self) { - self->handleResolve(err, results); - } - }); + resolver_->async_resolve( + service_url.host(), std::to_string(service_url.port()), + [this, self{shared_from_this()}](auto err, const auto& results) { handleResolve(err, results); }); } void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::results_type& results) { if (err) { - std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; + std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_; LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); close(); return; } if (!results.empty()) { - LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints"); + LOG_DEBUG(cnxString() << "Resolved " << results.size() << " endpoints"); for (const auto& entry : results) { const auto& ep = entry.endpoint(); - LOG_DEBUG(cnxString_ << " " << ep.address().to_string() << ":" << ep.port()); + LOG_DEBUG(cnxString() << " " << ep.address().to_string() << ":" << ep.port()); } } - auto weakSelf = weak_from_this(); - connectTimeoutTask_->setCallback([weakSelf, results = tcp::resolver::results_type(results)]( - const PeriodicTask::ErrorCode& ec) { - ClientConnectionPtr ptr = weakSelf.lock(); - if (!ptr) { - LOG_DEBUG("Connect timeout callback skipped: connection was already destroyed"); - return; - } - - if (ptr->state_ != Ready) { - LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was not established in " - << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); - PeriodicTask::ErrorCode err; - ptr->socket_->close(err); - if (err) { - LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); + // Acquire the lock to prevent the race: + // 1. thread 1: isClosed() returns false + // 2. thread 2: call `connectTimeoutTask_->stop()` and `connectTimeoutTask_.reset()` in `close()` + // 3. thread 1: call `connectTimeoutTask_->setCallback()` and `connectTimeoutTask_->start()` + // Then the self captured in the callback of `connectTimeoutTask_` would be kept alive unexpectedly and + // cannot be cancelled until the executor is destroyed. + std::lock_guard lock{mutex_}; + if (isClosed() || !connectTimeoutTask_) { + return; + } + connectTimeoutTask_->setCallback( + [this, self{shared_from_this()}, + results = tcp::resolver::results_type(results)](const PeriodicTask::ErrorCode& ec) { + if (state_ != Ready) { + LOG_ERROR(cnxString() << "Connection to " << results << " was not established in " + << connectTimeoutTask_->getPeriodMs() << " ms"); + close(); + } else { + connectTimeoutTask_->stop(); } Review Comment: The connect-timeout callback set in `handleResolve()` dereferences `connectTimeoutTask_` (`getPeriodMs()` / `stop()`) without holding `mutex_`, but `close()` and `handlePulsarConnected()` now reset `connectTimeoutTask_` under the mutex. This creates a race where the timer callback can run concurrently with `connectTimeoutTask_.reset()` and crash on a null pointer. A safer approach is to capture the current `connectTimeoutTask_` (shared_ptr) or `periodMs` by value in the callback and operate on that captured object/value instead of re-reading the member pointer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
