merlimat opened a new pull request #6226: [C++] Ensure ExecutorServicePtr is 
not destroyed while the timer object is alive
URL: https://github.com/apache/pulsar/pull/6226
 
 
   ### Motivation
   
   The `ProducerStatsImpl` and `ConsumerStatsImpl` objects are keeping a 
`shared_ptr` to a timer object. While that keeps the timer alive, it does not 
keep the required event loop object. 
   
   When the stats objects are destroyed, we cancel the timer, though that will 
use the already destroyed `ExecutorService`. In most cases this will not show 
any problem, though in others it will cause a segfault.
   
   Valgrind output showing the issue:
   
   ```
   ==12549== Invalid read of size 4
   ==12549==    at 0x5966010: __pthread_mutex_unlock_full 
(pthread_mutex_unlock.c:100)
   ==12549==    by 0x5BFECD9: unlock (posix_mutex.hpp:58)
   ==12549==    by 0x5BFECD9: unlock (scoped_lock.hpp:72)
   ==12549==    by 0x5BFECD9: unsigned long 
boost::asio::detail::epoll_reactor::cancel_timer<boost::asio::time_traits<boost::posix_time::ptime>
 
>(boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime>
 >&, 
boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime>
 >::per_timer_data&, unsigned long) (epoll_reactor.hpp:65)
   ==12549==    by 0x5D28808: cancel (deadline_timer_service.hpp:108)
   ==12549==    by 0x5D28808: cancel (deadline_timer_service.hpp:96)
   ==12549==    by 0x5D28808: cancel (basic_deadline_timer.hpp:216)
   ==12549==    by 0x5D28808: pulsar::ConsumerStatsImpl::~ConsumerStatsImpl() 
(ConsumerStatsImpl.cc:70)
   ==12549==    by 0x41F2F5: 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() 
(shared_ptr_base.h:150)
   ==12549==    by 0x5C50211: ~__shared_count (shared_ptr_base.h:659)
   ==12549==    by 0x5C50211: ~__shared_ptr (shared_ptr_base.h:925)
   ==12549==    by 0x5C50211: ~shared_ptr (shared_ptr.h:93)
   ==12549==    by 0x5C50211: pulsar::ConsumerImpl::~ConsumerImpl() 
(ConsumerImpl.cc:95)
   ==12549==    by 0x41F2F5: 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() 
(shared_ptr_base.h:150)
   ==12549==    by 0x4E39EC: ~__shared_count (shared_ptr_base.h:659)
   ==12549==    by 0x4E39EC: ~__shared_ptr (shared_ptr_base.h:925)
   ==12549==    by 0x4E39EC: ~shared_ptr (shared_ptr.h:93)
   ==12549==    by 0x4E39EC: 
BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() 
(Consumer.h:35)
   ==12549==    by 0x541853: void 
testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, 
void>(testing::Test*, void (testing::Test::*)(), char const*) (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x53C484: void 
testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, 
void>(testing::Test*, void (testing::Test::*)(), char const*) (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x521911: testing::Test::Run() (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x5221B9: testing::TestInfo::Run() (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x5228A8: testing::TestCase::Run() (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==  Address 0x10837bb0 is 64 bytes inside a block of size 176 free'd
   ==12549==    at 0x4C2F24B: operator delete(void*) (in 
/usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
   ==12549==    by 0x431509: 
boost::asio::detail::epoll_reactor::~epoll_reactor() (epoll_reactor.ipp:69)
   ==12549==    by 0x5CEF161: destroy (service_registry.ipp:101)
   ==12549==    by 0x5CEF161: ~service_registry (service_registry.ipp:45)
   ==12549==    by 0x5CEF161: ~io_service (io_service.ipp:53)
   ==12549==    by 0x5CEF161: pulsar::ExecutorService::~ExecutorService() 
(ExecutorService.cc:30)
   ==12549==    by 0x41F2F5: 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() 
(shared_ptr_base.h:150)
   ==12549==    by 0x5C4FD2A: ~__shared_count (shared_ptr_base.h:659)
   ==12549==    by 0x5C4FD2A: ~__shared_ptr (shared_ptr_base.h:925)
   ==12549==    by 0x5C4FD2A: ~shared_ptr (shared_ptr.h:93)
   ==12549==    by 0x5C4FD2A: ~NegativeAcksTracker (NegativeAcksTracker.h:32)
   ==12549==    by 0x5C4FD2A: pulsar::ConsumerImpl::~ConsumerImpl() 
(ConsumerImpl.cc:95)
   ==12549==    by 0x41F2F5: 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() 
(shared_ptr_base.h:150)
   ==12549==    by 0x4E39EC: ~__shared_count (shared_ptr_base.h:659)
   ==12549==    by 0x4E39EC: ~__shared_ptr (shared_ptr_base.h:925)
   ==12549==    by 0x4E39EC: ~shared_ptr (shared_ptr.h:93)
   ==12549==    by 0x4E39EC: 
BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() 
(Consumer.h:35)
   ==12549==    by 0x541853: void 
testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, 
void>(testing::Test*, void (testing::Test::*)(), char const*) (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x53C484: void 
testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, 
void>(testing::Test*, void (testing::Test::*)(), char const*) (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x521911: testing::Test::Run() (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x5221B9: testing::TestInfo::Run() (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==    by 0x5228A8: testing::TestCase::Run() (in 
/pulsar/pulsar-client-cpp/tests/main)
   ==12549==  Block was alloc'd at
   ==12549==    at 0x4C2E0EF: operator new(unsigned long) (in 
/usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
   ==12549==    by 0x43763A: boost::asio::io_service::service* 
boost::asio::detail::service_registry::create<boost::asio::detail::epoll_reactor>(boost::asio::io_service&)
 (service_registry.hpp:81)
   ==12549==    by 0x432497: 
boost::asio::detail::service_registry::do_use_service(boost::asio::io_service::service::key
 const&, boost::asio::io_service::service* (*)(boost::asio::io_service&)) 
(service_registry.ipp:123)
   ==12549==    by 0x4325B6: use_service<boost::asio::detail::epoll_reactor> 
(service_registry.hpp:48)
   ==12549==    by 0x4325B6: use_service<boost::asio::detail::epoll_reactor> 
(io_service.hpp:33)
   ==12549==    by 0x4325B6: reactive_socket_service_base 
(reactive_socket_service_base.ipp:33)
   ==12549==    by 0x4325B6: reactive_socket_service 
(reactive_socket_service.hpp:77)
   ==12549==    by 0x4325B6: stream_socket_service 
(stream_socket_service.hpp:95)
   ==12549==    by 0x4325B6: boost::asio::io_service::service* 
boost::asio::detail::service_registry::create<boost::asio::stream_socket_service<boost::asio::ip::tcp>
 >(boost::asio::io_service&) (service_registry.hpp:81)
   ==12549==    by 0x5CF0E5D: do_use_service (service_registry.ipp:123)
   ==12549==    by 0x5CF0E5D: 
use_service<boost::asio::stream_socket_service<boost::asio::ip::tcp> > 
(service_registry.hpp:48)
   ==12549==    by 0x5CF0E5D: 
use_service<boost::asio::stream_socket_service<boost::asio::ip::tcp> > 
(io_service.hpp:33)
   ==12549==    by 0x5CF0E5D: basic_io_object (basic_io_object.hpp:183)
   ==12549==    by 0x5CF0E5D: basic_socket (basic_socket.hpp:71)
   ==12549==    by 0x5CF0E5D: basic_stream_socket (basic_stream_socket.hpp:73)
   ==12549==    by 0x5CF0E5D: pulsar::ExecutorService::createSocket() 
(ExecutorService.cc:36)
   ==12549==    by 0x5CB1794: 
pulsar::ClientConnection::ClientConnection(std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, 
std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > 
const&, std::shared_ptr<pulsar::ExecutorService>, pulsar::ClientConfiguration 
const&, std::shared_ptr<pulsar::Authentication> const&) 
(ClientConnection.cc:169)
   ==12549==    by 0x5C1D3F0: 
pulsar::ConnectionPool::getConnectionAsync(std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, 
std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > 
const&) (ConnectionPool.cc:83)
   ==12549==    by 0x5CA573F: 
pulsar::BinaryProtoLookupService::getPartitionMetadataAsync(std::shared_ptr<pulsar::TopicName>
 const&) (BinaryProtoLookupService.cc:72)
   ==12549==    by 0x5D00CA6: 
pulsar::ClientImpl::subscribeAsync(std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, 
std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > 
const&, pulsar::ConsumerConfiguration const&, std::function<void 
(pulsar::Result, pulsar::Consumer)>) (ClientImpl.cc:350)
   ==12549==    by 0x5C985EC: 
pulsar::Client::subscribeAsync(std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, 
std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > 
const&, pulsar::ConsumerConfiguration const&, std::function<void 
(pulsar::Result, pulsar::Consumer)>) (Client.cc:89)
   ==12549==    by 0x5C98803: 
pulsar::Client::subscribeAsync(std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&, 
std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > 
const&, std::function<void (pulsar::Result, pulsar::Consumer)>) (Client.cc:83)
   ==12549==    by 0x4E3852: 
BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() 
(BasicEndToEndTest.cc:2848)
   ==12549==
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to