This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 16fff189dd932a2fe4be0bbfea8f7f40c030f846
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Feb 9 17:38:41 2020 -0800

    [C++] Fixed memory corruption on ExecutorService destructor (#6270)
    
    ### Motivation
    
    In the C++ test CI jobs there are spurious tests failing with segfaults.
    
    Analyzing the test execution with valgrind it's possible to see that the 
thread that is running the boost asio event loop is accessing the `io_service` 
after that already got destroyed.
    
    To ensure that the `io_service` is always valid until the thread exists, we 
pass a `shared_ptr` so that will ensure the liveness.
    
    Example of valgrind errors:
    
    ```
    ==10034== Invalid read of size 4
    ==10034==    at 0x4BCB784: __pthread_mutex_unlock_usercnt 
(pthread_mutex_unlock.c:40)
    ==10034==    by 0x4BCB784: pthread_mutex_unlock (pthread_mutex_unlock.c:357)
    ==10034==    by 0x197DB9: boost::asio::detail::posix_mutex::unlock() 
(posix_mutex.hpp:58)
    ==10034==    by 0x199492: 
boost::asio::detail::conditionally_enabled_mutex::scoped_lock::~scoped_lock() 
(conditionally_enabled_mutex.hpp:66)
    ==10034==    by 0x4F03895: 
boost::asio::detail::scheduler::run(boost::system::error_code&) 
(scheduler.ipp:151)
    ==10034==    by 0x4F03F8B: boost::asio::io_context::run() 
(io_context.ipp:62)
    ==10034==    by 0x4FDE872: 
pulsar::ExecutorService::startWorker(std::shared_ptr<boost::asio::io_context>) 
(ExecutorService.cc:39)
    ==10034==    by 0x4FE99A3: void std::__invoke_impl<void, void 
(pulsar::ExecutorService::*&)(std::shared_ptr<boost::asio::io_context>), 
pulsar::ExecutorService*&, decltype(nullptr)&>(std::__invoke_memfun_deref, void 
(pulsar::ExecutorService::*&)(std::shared_ptr<boost::asio::io_context>), 
pulsar::ExecutorService*&, decltype(nullptr)&) (invoke.h:73)
    ==10034==    by 0x4FE986D: std::__invoke_result<void 
(pulsar::ExecutorService::*&)(std::shared_ptr<boost::asio::io_context>), 
pulsar::ExecutorService*&, decltype(nullptr)&>::type std::__invoke<void 
(pulsar::ExecutorService::*&)(std::shared_ptr<boost::asio::io_context>), 
pulsar::ExecutorService*&, decltype(nullptr)&>(void 
(pulsar::ExecutorService::*&)(std::shared_ptr<boost::asio::io_context>), 
pulsar::ExecutorService*&, decltype(nullptr)&) (invoke.h:95)
    ==10034==    by 0x4FE9767: void std::_Bind<void 
(pulsar::ExecutorService::*(pulsar::ExecutorService*, 
decltype(nullptr)))(std::shared_ptr<boost::asio::io_context>)>::__call<void, , 
0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (functional:400)
    ==10034==    by 0x4FE94A0: void std::_Bind<void 
(pulsar::ExecutorService::*(pulsar::ExecutorService*, 
decltype(nullptr)))(std::shared_ptr<boost::asio::io_context>)>::operator()<, 
void>() (functional:484)
    ==10034==    by 0x4FE9095: 
boost::asio::detail::posix_thread::func<std::_Bind<void 
(pulsar::ExecutorService::*(pulsar::ExecutorService*, 
decltype(nullptr)))(std::shared_ptr<boost::asio::io_context>)> >::run() 
(posix_thread.hpp:86)
    ==10034==    by 0x4F03E00: boost_asio_detail_posix_thread_function 
(posix_thread.ipp:74)
    ==10034==  Address 0x8896d08 is 72 bytes inside a block of size 240 free'd
    ==10034==    at 0x483BFBF: operator delete(void*) (in 
/usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
    ==10034==    by 0x1A0001: boost::asio::detail::scheduler::~scheduler() 
(scheduler.hpp:38)
    ==10034==    by 0x198E5B: 
boost::asio::detail::service_registry::destroy(boost::asio::execution_context::service*)
 (service_registry.ipp:110)
    ==10034==    by 0x198D94: 
boost::asio::detail::service_registry::destroy_services() 
(service_registry.ipp:54)
    ==10034==    by 0x199294: boost::asio::execution_context::destroy() 
(execution_context.ipp:46)
    ==10034==    by 0x199222: 
boost::asio::execution_context::~execution_context() (execution_context.ipp:35)
    ==10034==    by 0x19B90F: boost::asio::io_context::~io_context() 
(io_context.ipp:55)
    ==10034==    by 0x1B3B7F: std::_Sp_counted_ptr<boost::asio::io_context*, 
(__gnu_cxx::_Lock_policy)2>::_M_dispose() (shared_ptr_base.h:377)
    ==10034==    by 0x1A283B: 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() 
(shared_ptr_base.h:155)
    ==10034==    by 0x19EC34: 
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() 
(shared_ptr_base.h:730)
    ==10034==    by 0x19D123: std::__shared_ptr<boost::asio::io_context, 
(__gnu_cxx::_Lock_policy)2>::~__shared_ptr() (shared_ptr_base.h:1169)
    ==10034==    by 0x19D143: 
std::shared_ptr<boost::asio::io_context>::~shared_ptr() (shared_ptr.h:103)
    ==10034==  Block was alloc'd at
    ==10034==    at 0x483AE63: operator new(unsigned long) (in 
/usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
    ==10034==    by 0x19B7DA: boost::asio::io_context::io_context() 
(io_context.ipp:38)
    ==10034==    by 0x4FDE622: pulsar::ExecutorService::ExecutorService() 
(ExecutorService.cc:31)
    ==10034==    by 0x4FE871C: void 
__gnu_cxx::new_allocator<pulsar::ExecutorService>::construct<pulsar::ExecutorService>(pulsar::ExecutorService*)
 (new_allocator.h:147)
    ==10034==    by 0x4FE8570: void 
std::allocator_traits<std::allocator<pulsar::ExecutorService> 
>::construct<pulsar::ExecutorService>(std::allocator<pulsar::ExecutorService>&, 
pulsar::ExecutorService*) (alloc_traits.h:484)
    ==10034==    by 0x4FE807F: 
std::_Sp_counted_ptr_inplace<pulsar::ExecutorService, 
std::allocator<pulsar::ExecutorService>, 
(__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<>(std::allocator<pulsar::ExecutorService>)
 (shared_ptr_base.h:548)
    ==10034==    by 0x4FE77AD: 
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<pulsar::ExecutorService,
 std::allocator<pulsar::ExecutorService>>(pulsar::ExecutorService*&, 
std::_Sp_alloc_shared_tag<std::allocator<pulsar::ExecutorService> >) 
(shared_ptr_base.h:679)
    ==10034==    by 0x4FE6E3F: std::__shared_ptr<pulsar::ExecutorService, 
(__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<pulsar::ExecutorService>>(std::_Sp_alloc_shared_tag<std::allocator<pulsar::ExecutorService>
 >) (shared_ptr_base.h:1344)
    ==10034==    by 0x4FE62C8: 
std::shared_ptr<pulsar::ExecutorService>::shared_ptr<std::allocator<pulsar::ExecutorService>>(std::_Sp_alloc_shared_tag<std::allocator<pulsar::ExecutorService>
 >) (shared_ptr.h:359)
    ==10034==    by 0x4FE4DEF: std::shared_ptr<pulsar::ExecutorService> 
std::allocate_shared<pulsar::ExecutorService, 
std::allocator<pulsar::ExecutorService>>(std::allocator<pulsar::ExecutorService>
 const&) (shared_ptr.h:702)
    ==10034==    by 0x4FE3608: std::shared_ptr<pulsar::ExecutorService> 
std::make_shared<pulsar::ExecutorService>() (shared_ptr.h:718)
    ==10034==    by 0x4FDEFBD: pulsar::ExecutorServiceProvider::get() 
(ExecutorService.cc:90)
    ```
---
 pulsar-client-cpp/lib/ClientConnection.cc |  6 +++---
 pulsar-client-cpp/lib/ExecutorService.cc  | 18 ++++++++++++------
 pulsar-client-cpp/lib/ExecutorService.h   |  4 ++--
 3 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 384789a..be83370 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -145,11 +145,11 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       resolver_(executor_->createTcpResolver()),
       socket_(executor_->createSocket()),
 #if BOOST_VERSION >= 107000
-      strand_(boost::asio::make_strand(executor_->io_service_.get_executor())),
+      
strand_(boost::asio::make_strand(executor_->io_service_->get_executor())),
 #elif BOOST_VERSION >= 106600
-      strand_(executor_->io_service_.get_executor()),
+      strand_(executor_->io_service_->get_executor()),
 #else
-      strand_(executor_->io_service_),
+      strand_(*(executor_->io_service_)),
 #endif
       logicalAddress_(logicalAddress),
       physicalAddress_(physicalAddress),
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc 
b/pulsar-client-cpp/lib/ExecutorService.cc
index 4d3e4ed..75f465d 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -25,15 +25,21 @@
 namespace pulsar {
 
 ExecutorService::ExecutorService()
-    : io_service_(), work_(new BackgroundWork(io_service_)), worker_([&]() { 
io_service_.run(); }) {}
+    : io_service_(new boost::asio::io_service()),
+      work_(new BackgroundWork(*io_service_)),
+      worker_(std::bind(&ExecutorService::startWorker, this, io_service_)) {}
 
 ExecutorService::~ExecutorService() { close(); }
 
+void ExecutorService::startWorker(std::shared_ptr<boost::asio::io_service> 
io_service) { io_service_->run(); }
+
 /*
  *  factory method of boost::asio::ip::tcp::socket associated with io_service_ 
instance
  *  @ returns shared_ptr to this socket
  */
-SocketPtr ExecutorService::createSocket() { return SocketPtr(new 
boost::asio::ip::tcp::socket(io_service_)); }
+SocketPtr ExecutorService::createSocket() {
+    return SocketPtr(new boost::asio::ip::tcp::socket(*io_service_));
+}
 
 TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, 
boost::asio::ssl::context &ctx) {
     return 
std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> >(
@@ -45,24 +51,24 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr 
&socket, boost::asio::ss
  *  @returns shraed_ptr to resolver object
  */
 TcpResolverPtr ExecutorService::createTcpResolver() {
-    return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_));
+    return TcpResolverPtr(new boost::asio::ip::tcp::resolver(*io_service_));
 }
 
 DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
-    return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_));
+    return DeadlineTimerPtr(new boost::asio::deadline_timer(*io_service_));
 }
 
 void ExecutorService::close() {
     // Ensure this service has not already been closed. This is
     // because worker_.join() is not re-entrant on Windows
     if (work_) {
-        io_service_.stop();
+        io_service_->stop();
         work_.reset();
         worker_.join();
     }
 }
 
-void ExecutorService::postWork(std::function<void(void)> task) { 
io_service_.post(task); }
+void ExecutorService::postWork(std::function<void(void)> task) { 
io_service_->post(task); }
 
 /////////////////////
 
diff --git a/pulsar-client-cpp/lib/ExecutorService.h 
b/pulsar-client-cpp/lib/ExecutorService.h
index 7162732..883d4ce 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -51,12 +51,12 @@ class PULSAR_PUBLIC ExecutorService : private 
boost::noncopyable {
     /*
      *  only called once and within lock so no need to worry about 
thread-safety
      */
-    void startWorker();
+    void startWorker(std::shared_ptr<boost::asio::io_service> io_service);
 
     /*
      * io_service is our interface to os, io object schedule async ops on this 
object
      */
-    boost::asio::io_service io_service_;
+    std::shared_ptr<boost::asio::io_service> io_service_;
 
     /*
      * work will not let io_service.run() return even after it has finished 
work

Reply via email to