This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1273bac6c2c91e764dac2a19a89166023acfcecf Author: Yunze Xu <[email protected]> AuthorDate: Thu Oct 21 14:29:43 2021 +0800 Fix frequent segmentation fault of Python tests by refactoring ExecutorService (#12427) * Refactor ExecutorService to avoid usage of pointers * Remove unnecessary friend class * Fix CentOS 7 build * Fix PeriodicalTest (cherry picked from commit af0ea69a0368a2804cedbc93a8911b0aebf26dbc) --- pulsar-client-cpp/lib/ClientConnection.cc | 10 +++--- pulsar-client-cpp/lib/ExecutorService.cc | 55 ++++++++++++++++------------- pulsar-client-cpp/lib/ExecutorService.h | 42 ++++++++++------------ pulsar-client-cpp/tests/PeriodicTaskTest.cc | 12 +++---- 4 files changed, 61 insertions(+), 58 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 45ed188..6e6f2a3 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -162,11 +162,11 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: resolver_(executor_->createTcpResolver()), socket_(executor_->createSocket()), #if BOOST_VERSION >= 107000 - strand_(boost::asio::make_strand(executor_->io_service_->get_executor())), + strand_(boost::asio::make_strand(executor_->getIOService().get_executor())), #elif BOOST_VERSION >= 106600 - strand_(executor_->io_service_->get_executor()), + strand_(executor_->getIOService().get_executor()), #else - strand_(*(executor_->io_service_)), + strand_(executor_->getIOService()), #endif logicalAddress_(logicalAddress), physicalAddress_(physicalAddress), @@ -183,7 +183,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: #if BOOST_VERSION >= 105400 boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client); #else - boost::asio::ssl::context ctx(*executor_->io_service_, boost::asio::ssl::context::tlsv1_client); + boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client); #endif Url serviceUrl; Url::parse(physicalAddress, serviceUrl); @@ -240,7 +240,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: } } - tlsSocket_ = executor_->createTlsSocket(socket_, ctx); + tlsSocket_ = ExecutorService::createTlsSocket(socket_, ctx); LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) { diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc index 4db3112..9cfbd82 100644 --- a/pulsar-client-cpp/lib/ExecutorService.cc +++ b/pulsar-client-cpp/lib/ExecutorService.cc @@ -27,22 +27,40 @@ DECLARE_LOG_OBJECT() namespace pulsar { -ExecutorService::ExecutorService() - : io_service_(new boost::asio::io_service()), - work_(new BackgroundWork(*io_service_)), - worker_(std::bind(&ExecutorService::startWorker, this, io_service_)) {} +ExecutorService::ExecutorService() {} ExecutorService::~ExecutorService() { close(); } -void ExecutorService::startWorker(std::shared_ptr<boost::asio::io_service> io_service) { io_service_->run(); } +void ExecutorService::start() { + auto self = shared_from_this(); + std::thread t{[self] { + if (self->isClosed()) { + return; + } + boost::system::error_code ec; + self->getIOService().run(ec); + if (ec) { + LOG_ERROR("Failed to run io_service: " << ec.message()); + } + }}; + t.detach(); +} + +ExecutorServicePtr ExecutorService::create() { + // make_shared cannot access the private constructor, so we need to expose the private constructor via a + // derived class. + struct ExecutorServiceImpl : public ExecutorService {}; + + auto executor = std::make_shared<ExecutorServiceImpl>(); + executor->start(); + return std::static_pointer_cast<ExecutorService>(executor); +} /* * factory method of boost::asio::ip::tcp::socket associated with io_service_ instance * @ returns shared_ptr to this socket */ -SocketPtr ExecutorService::createSocket() { - return SocketPtr(new boost::asio::ip::tcp::socket(*io_service_)); -} +SocketPtr ExecutorService::createSocket() { return SocketPtr(new boost::asio::ip::tcp::socket(io_service_)); } TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) { return std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> >( @@ -54,11 +72,11 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ss * @returns shraed_ptr to resolver object */ TcpResolverPtr ExecutorService::createTcpResolver() { - return TcpResolverPtr(new boost::asio::ip::tcp::resolver(*io_service_)); + return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_)); } DeadlineTimerPtr ExecutorService::createDeadlineTimer() { - return DeadlineTimerPtr(new boost::asio::deadline_timer(*io_service_)); + return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_)); } void ExecutorService::close() { @@ -67,21 +85,10 @@ void ExecutorService::close() { return; } - io_service_->stop(); - work_.reset(); - // Detach the worker thread instead of join to avoid potential deadlock - if (worker_.joinable()) { - try { - worker_.detach(); - } catch (const std::system_error &e) { - // This condition will happen if we're forking the process, therefore the thread was not ported to - // the child side of the fork and the detach would be failing. - LOG_DEBUG("Failed to detach thread: " << e.what()); - } - } + io_service_.stop(); } -void ExecutorService::postWork(std::function<void(void)> task) { io_service_->post(task); } +void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); } ///////////////////// @@ -93,7 +100,7 @@ ExecutorServicePtr ExecutorServiceProvider::get() { int idx = executorIdx_++ % executors_.size(); if (!executors_[idx]) { - executors_[idx] = std::make_shared<ExecutorService>(); + executors_[idx] = ExecutorService::create(); } return executors_[idx]; diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h index 6746936..6b09091 100644 --- a/pulsar-client-cpp/lib/ExecutorService.h +++ b/pulsar-client-cpp/lib/ExecutorService.h @@ -25,7 +25,6 @@ #include <boost/asio/ssl.hpp> #include <functional> #include <thread> -#include <boost/noncopyable.hpp> #include <mutex> #include <pulsar/defines.h> @@ -34,51 +33,48 @@ typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr; typedef std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> > TlsSocketPtr; typedef std::shared_ptr<boost::asio::ip::tcp::resolver> TcpResolverPtr; typedef std::shared_ptr<boost::asio::deadline_timer> DeadlineTimerPtr; -class PULSAR_PUBLIC ExecutorService : private boost::noncopyable { - friend class ClientConnection; - +class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> { public: - ExecutorService(); + using IOService = boost::asio::io_service; + using SharedPtr = std::shared_ptr<ExecutorService>; + + static SharedPtr create(); ~ExecutorService(); + ExecutorService(const ExecutorService &) = delete; + ExecutorService &operator=(const ExecutorService &) = delete; + SocketPtr createSocket(); - TlsSocketPtr createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx); + static TlsSocketPtr createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx); TcpResolverPtr createTcpResolver(); DeadlineTimerPtr createDeadlineTimer(); void postWork(std::function<void(void)> task); + void close(); - boost::asio::io_service &getIOService() { return *io_service_; } + IOService &getIOService() { return io_service_; } + bool isClosed() const noexcept { return closed_; } private: /* - * only called once and within lock so no need to worry about thread-safety - */ - void startWorker(std::shared_ptr<boost::asio::io_service> io_service); - - /* * io_service is our interface to os, io object schedule async ops on this object */ - std::shared_ptr<boost::asio::io_service> io_service_; + IOService io_service_; /* * work will not let io_service.run() return even after it has finished work * it will keep it running in the background so we don't have to take care of it */ - typedef boost::asio::io_service::work BackgroundWork; - std::unique_ptr<BackgroundWork> work_; - - /* - * worker thread which runs until work object is destroyed, it's running io_service::run in - * background invoking async handlers as they are finished and result is available from - * io_service - */ - std::thread worker_; + IOService::work work_{io_service_}; std::atomic_bool closed_{false}; + + ExecutorService(); + + void start(); }; -typedef std::shared_ptr<ExecutorService> ExecutorServicePtr; +using ExecutorServicePtr = ExecutorService::SharedPtr; class PULSAR_PUBLIC ExecutorServiceProvider { public: diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc b/pulsar-client-cpp/tests/PeriodicTaskTest.cc index 11c1c62..2c1da70 100644 --- a/pulsar-client-cpp/tests/PeriodicTaskTest.cc +++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc @@ -29,11 +29,11 @@ DECLARE_LOG_OBJECT() using namespace pulsar; TEST(PeriodicTaskTest, testCountdownTask) { - ExecutorService executor; + auto executor = ExecutorService::create(); std::atomic_int count{5}; - auto task = std::make_shared<PeriodicTask>(executor.getIOService(), 200); + auto task = std::make_shared<PeriodicTask>(executor->getIOService(), 200); task->setCallback([task, &count](const PeriodicTask::ErrorCode& ec) { if (--count <= 0) { task->stop(); @@ -56,13 +56,13 @@ TEST(PeriodicTaskTest, testCountdownTask) { ASSERT_EQ(count.load(), 0); task->stop(); - executor.close(); + executor->close(); } TEST(PeriodicTaskTest, testNegativePeriod) { - ExecutorService executor; + auto executor = ExecutorService::create(); - auto task = std::make_shared<PeriodicTask>(executor.getIOService(), -1); + auto task = std::make_shared<PeriodicTask>(executor->getIOService(), -1); std::atomic_bool callbackTriggered{false}; task->setCallback([&callbackTriggered](const PeriodicTask::ErrorCode& ec) { callbackTriggered = true; }); @@ -71,5 +71,5 @@ TEST(PeriodicTaskTest, testNegativePeriod) { ASSERT_EQ(callbackTriggered.load(), false); task->stop(); - executor.close(); + executor->close(); }
