This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6d365c995a2a3ec3b48c57b1faaa40069e52d3c6 Author: Yunze Xu <[email protected]> AuthorDate: Wed Apr 27 15:10:01 2022 +0800 [C++] Wait until event loop terminates when closing the Client (#15316) * [C++] Wait until event loops terminates when closing the Client Fixes #13267 ### Motivation Unlike Java client, the `Client` of C++ client has a `shutdown` method that is responsible to execute the following steps: 1. Call `shutdown` on all internal producers and consumers 2. Close all connections in the pool 3. Close all executors of the executor providers. When an executor is closed, it call `io_service::stop()`, which makes the event loop (`io_service::run()`) in another thread return as soon as possible. However, there is no wait operation. If a client failed to create a producer or consumer, the `close` method will call `shutdown` and close all executors immediately and exits the application. In this case, the detached event loop thread might not exit ASAP, then valgrind will detect the memory leak. This memory leak can be avoided by sleeping for a while after `Client::close` returns or there are still other things to do after that. However, we should still adopt the semantics that after `Client::shutdown` returns, all event loop threads should be terminated. ### Modifications - Add a timeout parameter to the `close` method of `ExecutorService` and `ExecutorServiceProvider` as the max blocking timeout if it's non-negative. - Add a `TimeoutProcessor` helper class to update the left timeout after calling all methods that accept the timeout parameter. - Call `close` on all `ExecutorServiceProvider`s in `ClientImpl::shutdown` with 500ms timeout, which could be long enough. In addition, in `handleClose` method, call `shutdown` in another thread to avoid the deadlock. ### Verifying this change After applying this patch, the reproduce code in #13627 will pass the valgrind check. ``` ==3013== LEAK SUMMARY: ==3013== definitely lost: 0 bytes in 0 blocks ==3013== indirectly lost: 0 bytes in 0 blocks ==3013== possibly lost: 0 bytes in 0 blocks ``` (cherry picked from commit cd78f39a92521f3847b022580a6e66e651b5cb4b) --- pulsar-client-cpp/lib/ClientImpl.cc | 37 +++++++++++++++++----- pulsar-client-cpp/lib/ExecutorService.cc | 33 +++++++++++++++----- pulsar-client-cpp/lib/ExecutorService.h | 11 +++++-- pulsar-client-cpp/lib/TimeUtils.h | 48 +++++++++++++++++++++++++++++ pulsar-client-cpp/tests/CustomLoggerTest.cc | 26 ++++++++++------ 5 files changed, 130 insertions(+), 25 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 0b07b6e9f2d..60e3e248c12 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -26,6 +26,7 @@ #include "PartitionedConsumerImpl.h" #include "MultiTopicsConsumerImpl.h" #include "PatternMultiTopicsConsumerImpl.h" +#include "TimeUtils.h" #include <pulsar/ConsoleLoggerFactory.h> #include <boost/algorithm/string/predicate.hpp> #include <sstream> @@ -35,6 +36,7 @@ #include <algorithm> #include <random> #include <mutex> +#include <thread> #ifdef USE_LOG4CXX #include "Log4CxxLogger.h" #endif @@ -538,13 +540,20 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu lock.unlock(); LOG_DEBUG("Shutting down producers and consumers for client"); - shutdown(); - if (callback) { - if (closingError != ResultOk) { - LOG_DEBUG("Problem in closing client, could not close one or more consumers or producers"); + // handleClose() is called in ExecutorService's event loop, while shutdown() tried to wait the event + // loop exits. So here we use another thread to call shutdown(). + auto self = shared_from_this(); + std::thread shutdownTask{[this, self, callback] { + shutdown(); + if (callback) { + if (closingError != ResultOk) { + LOG_DEBUG( + "Problem in closing client, could not close one or more consumers or producers"); + } + callback(closingError); } - callback(closingError); - } + }}; + shutdownTask.detach(); } } @@ -580,11 +589,25 @@ void ClientImpl::shutdown() { return; } LOG_DEBUG("ConnectionPool is closed"); - ioExecutorProvider_->close(); + + // 500ms as the timeout is long enough because ExecutorService::close calls io_service::stop() internally + // and waits until io_service::run() in another thread returns, which should be as soon as possible after + // stop() is called. + TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{500}; + + timeoutProcessor.tik(); + ioExecutorProvider_->close(timeoutProcessor.getLeftTimeout()); + timeoutProcessor.tok(); LOG_DEBUG("ioExecutorProvider_ is closed"); + + timeoutProcessor.tik(); listenerExecutorProvider_->close(); + timeoutProcessor.tok(); LOG_DEBUG("listenerExecutorProvider_ is closed"); + + timeoutProcessor.tik(); partitionListenerExecutorProvider_->close(); + timeoutProcessor.tok(); LOG_DEBUG("partitionListenerExecutorProvider_ is closed"); } diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc index 9cfbd82881d..b9b5ed46478 100644 --- a/pulsar-client-cpp/lib/ExecutorService.cc +++ b/pulsar-client-cpp/lib/ExecutorService.cc @@ -21,6 +21,7 @@ #include <boost/asio.hpp> #include <functional> #include <memory> +#include "TimeUtils.h" #include "LogUtils.h" DECLARE_LOG_OBJECT() @@ -29,7 +30,7 @@ namespace pulsar { ExecutorService::ExecutorService() {} -ExecutorService::~ExecutorService() { close(); } +ExecutorService::~ExecutorService() { close(0); } void ExecutorService::start() { auto self = shared_from_this(); @@ -37,11 +38,16 @@ void ExecutorService::start() { if (self->isClosed()) { return; } + LOG_INFO("Run io_service in a single thread"); boost::system::error_code ec; self->getIOService().run(ec); if (ec) { LOG_ERROR("Failed to run io_service: " << ec.message()); + } else { + LOG_INFO("Event loop of ExecutorService exits successfully"); } + self->ioServiceDone_ = true; + self->cond_.notify_all(); }}; t.detach(); } @@ -79,13 +85,23 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() { return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_)); } -void ExecutorService::close() { +void ExecutorService::close(long timeoutMs) { bool expectedState = false; if (!closed_.compare_exchange_strong(expectedState, true)) { return; } + if (timeoutMs == 0) { // non-blocking + io_service_.stop(); + return; + } + std::unique_lock<std::mutex> lock{mutex_}; io_service_.stop(); + if (timeoutMs > 0) { + cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_.load(); }); + } else { // < 0 + cond_.wait(lock, [this] { return ioServiceDone_.load(); }); + } } void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); } @@ -106,14 +122,17 @@ ExecutorServicePtr ExecutorServiceProvider::get() { return executors_[idx]; } -void ExecutorServiceProvider::close() { +void ExecutorServiceProvider::close(long timeoutMs) { Lock lock(mutex_); - for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) { - if (*it != NULL) { - (*it)->close(); + TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{timeoutMs}; + for (auto &&executor : executors_) { + timeoutProcessor.tik(); + if (executor) { + executor->close(timeoutProcessor.getLeftTimeout()); } - it->reset(); + timeoutProcessor.tok(); + executor.reset(); } } } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h index 6b0909194b7..e4cbb3ce62e 100644 --- a/pulsar-client-cpp/lib/ExecutorService.h +++ b/pulsar-client-cpp/lib/ExecutorService.h @@ -20,6 +20,8 @@ #define _PULSAR_EXECUTOR_SERVICE_HEADER_ #include <atomic> +#include <condition_variable> +#include <chrono> #include <memory> #include <boost/asio.hpp> #include <boost/asio/ssl.hpp> @@ -50,7 +52,8 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut DeadlineTimerPtr createDeadlineTimer(); void postWork(std::function<void(void)> task); - void close(); + // See TimeoutProcessor for the semantics of the parameter. + void close(long timeoutMs = 3000); IOService &getIOService() { return io_service_; } bool isClosed() const noexcept { return closed_; } @@ -68,6 +71,9 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut IOService::work work_{io_service_}; std::atomic_bool closed_{false}; + std::mutex mutex_; + std::condition_variable cond_; + std::atomic_bool ioServiceDone_{false}; ExecutorService(); @@ -82,7 +88,8 @@ class PULSAR_PUBLIC ExecutorServiceProvider { ExecutorServicePtr get(); - void close(); + // See TimeoutProcessor for the semantics of the parameter. + void close(long timeoutMs = 3000); private: typedef std::vector<ExecutorServicePtr> ExecutorList; diff --git a/pulsar-client-cpp/lib/TimeUtils.h b/pulsar-client-cpp/lib/TimeUtils.h index 1da7d65923a..45157ae855b 100644 --- a/pulsar-client-cpp/lib/TimeUtils.h +++ b/pulsar-client-cpp/lib/TimeUtils.h @@ -19,6 +19,8 @@ #pragma once #include <boost/date_time/local_time/local_time.hpp> +#include <atomic> +#include <chrono> #include <pulsar/defines.h> @@ -33,4 +35,50 @@ class PULSAR_PUBLIC TimeUtils { static ptime now(); static int64_t currentTimeMillis(); }; + +// This class processes a timeout with the following semantics: +// > 0: wait at most the timeout until a blocking operation completes +// == 0: do not wait the blocking operation +// < 0: wait infinitely until a blocking operation completes. +// +// Here is a simple example usage: +// +// ```c++ +// // Wait at most 300 milliseconds +// TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{300}; +// while (!allOperationsAreDone()) { +// timeoutProcessor.tik(); +// // This method may block for some time +// performBlockingOperation(timeoutProcessor.getLeftTimeout()); +// timeoutProcessor.tok(); +// } +// ``` +// +// The template argument is the same as std::chrono::duration. +template <typename Duration> +class TimeoutProcessor { + public: + using Clock = std::chrono::high_resolution_clock; + + TimeoutProcessor(long timeout) : leftTimeout_(timeout) {} + + long getLeftTimeout() const noexcept { return leftTimeout_; } + + void tik() { before_ = Clock::now(); } + + void tok() { + if (leftTimeout_ > 0) { + leftTimeout_ -= std::chrono::duration_cast<Duration>(Clock::now() - before_).count(); + if (leftTimeout_ <= 0) { + // The timeout exceeds, getLeftTimeout() will return 0 to indicate we should not wait more + leftTimeout_ = 0; + } + } + } + + private: + std::atomic_long leftTimeout_; + std::chrono::time_point<Clock> before_; +}; + } // namespace pulsar diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc index 0b4e76adcc4..bd80c312e3b 100644 --- a/pulsar-client-cpp/tests/CustomLoggerTest.cc +++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc @@ -20,6 +20,7 @@ #include <pulsar/ConsoleLoggerFactory.h> #include <LogUtils.h> #include <gtest/gtest.h> +#include <atomic> #include <thread> using namespace pulsar; @@ -28,35 +29,42 @@ static std::vector<std::string> logLines; class MyTestLogger : public Logger { public: - MyTestLogger() = default; + MyTestLogger(const std::string &fileName) : fileName_(fileName) {} bool isEnabled(Level level) override { return true; } void log(Level level, int line, const std::string &message) override { std::stringstream ss; - ss << " " << level << ":" << line << " " << message << std::endl; + ss << std::this_thread::get_id() << " " << level << " " << fileName_ << ":" << line << " " << message + << std::endl; logLines.emplace_back(ss.str()); } + + private: + const std::string fileName_; }; class MyTestLoggerFactory : public LoggerFactory { public: - Logger *getLogger(const std::string &fileName) override { return logger; } - - private: - MyTestLogger *logger = new MyTestLogger; + Logger *getLogger(const std::string &fileName) override { return new MyTestLogger(fileName); } }; TEST(CustomLoggerTest, testCustomLogger) { // simulate new client created on a different thread (because logging factory is called once per thread) - auto testThread = std::thread([] { + std::atomic_int numLogLines{0}; + auto testThread = std::thread([&numLogLines] { ClientConfiguration clientConfig; auto customLogFactory = new MyTestLoggerFactory(); clientConfig.setLogger(customLogFactory); // reset to previous log factory Client client("pulsar://localhost:6650", clientConfig); client.close(); - ASSERT_EQ(logLines.size(), 7); + ASSERT_TRUE(logLines.size() > 0); + for (auto &&line : logLines) { + std::cout << line; + std::cout.flush(); + } + numLogLines = logLines.size(); LogUtils::resetLoggerFactory(); }); testThread.join(); @@ -65,7 +73,7 @@ TEST(CustomLoggerTest, testCustomLogger) { Client client("pulsar://localhost:6650", clientConfig); client.close(); // custom logger didn't get any new lines - ASSERT_EQ(logLines.size(), 7); + ASSERT_EQ(logLines.size(), numLogLines); } TEST(CustomLoggerTest, testConsoleLoggerFactory) {
