merlimat opened a new pull request #6226: [C++] Ensure ExecutorServicePtr is not destroyed while the timer object is alive URL: https://github.com/apache/pulsar/pull/6226 ### Motivation The `ProducerStatsImpl` and `ConsumerStatsImpl` objects are keeping a `shared_ptr` to a timer object. While that keeps the timer alive, it does not keep the required event loop object. When the stats objects are destroyed, we cancel the timer, though that will use the already destroyed `ExecutorService`. In most cases this will not show any problem, though in others it will cause a segfault. Valgrind output showing the issue: ``` ==12549== Invalid read of size 4 ==12549== at 0x5966010: __pthread_mutex_unlock_full (pthread_mutex_unlock.c:100) ==12549== by 0x5BFECD9: unlock (posix_mutex.hpp:58) ==12549== by 0x5BFECD9: unlock (scoped_lock.hpp:72) ==12549== by 0x5BFECD9: unsigned long boost::asio::detail::epoll_reactor::cancel_timer<boost::asio::time_traits<boost::posix_time::ptime> >(boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime> >&, boost::asio::detail::timer_queue<boost::asio::time_traits<boost::posix_time::ptime> >::per_timer_data&, unsigned long) (epoll_reactor.hpp:65) ==12549== by 0x5D28808: cancel (deadline_timer_service.hpp:108) ==12549== by 0x5D28808: cancel (deadline_timer_service.hpp:96) ==12549== by 0x5D28808: cancel (basic_deadline_timer.hpp:216) ==12549== by 0x5D28808: pulsar::ConsumerStatsImpl::~ConsumerStatsImpl() (ConsumerStatsImpl.cc:70) ==12549== by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150) ==12549== by 0x5C50211: ~__shared_count (shared_ptr_base.h:659) ==12549== by 0x5C50211: ~__shared_ptr (shared_ptr_base.h:925) ==12549== by 0x5C50211: ~shared_ptr (shared_ptr.h:93) ==12549== by 0x5C50211: pulsar::ConsumerImpl::~ConsumerImpl() (ConsumerImpl.cc:95) ==12549== by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150) ==12549== by 0x4E39EC: ~__shared_count (shared_ptr_base.h:659) ==12549== by 0x4E39EC: ~__shared_ptr (shared_ptr_base.h:925) ==12549== by 0x4E39EC: ~shared_ptr (shared_ptr.h:93) ==12549== by 0x4E39EC: BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() (Consumer.h:35) ==12549== by 0x541853: void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x53C484: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x521911: testing::Test::Run() (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x5221B9: testing::TestInfo::Run() (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x5228A8: testing::TestCase::Run() (in /pulsar/pulsar-client-cpp/tests/main) ==12549== Address 0x10837bb0 is 64 bytes inside a block of size 176 free'd ==12549== at 0x4C2F24B: operator delete(void*) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so) ==12549== by 0x431509: boost::asio::detail::epoll_reactor::~epoll_reactor() (epoll_reactor.ipp:69) ==12549== by 0x5CEF161: destroy (service_registry.ipp:101) ==12549== by 0x5CEF161: ~service_registry (service_registry.ipp:45) ==12549== by 0x5CEF161: ~io_service (io_service.ipp:53) ==12549== by 0x5CEF161: pulsar::ExecutorService::~ExecutorService() (ExecutorService.cc:30) ==12549== by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150) ==12549== by 0x5C4FD2A: ~__shared_count (shared_ptr_base.h:659) ==12549== by 0x5C4FD2A: ~__shared_ptr (shared_ptr_base.h:925) ==12549== by 0x5C4FD2A: ~shared_ptr (shared_ptr.h:93) ==12549== by 0x5C4FD2A: ~NegativeAcksTracker (NegativeAcksTracker.h:32) ==12549== by 0x5C4FD2A: pulsar::ConsumerImpl::~ConsumerImpl() (ConsumerImpl.cc:95) ==12549== by 0x41F2F5: std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() (shared_ptr_base.h:150) ==12549== by 0x4E39EC: ~__shared_count (shared_ptr_base.h:659) ==12549== by 0x4E39EC: ~__shared_ptr (shared_ptr_base.h:925) ==12549== by 0x4E39EC: ~shared_ptr (shared_ptr.h:93) ==12549== by 0x4E39EC: BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() (Consumer.h:35) ==12549== by 0x541853: void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x53C484: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x521911: testing::Test::Run() (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x5221B9: testing::TestInfo::Run() (in /pulsar/pulsar-client-cpp/tests/main) ==12549== by 0x5228A8: testing::TestCase::Run() (in /pulsar/pulsar-client-cpp/tests/main) ==12549== Block was alloc'd at ==12549== at 0x4C2E0EF: operator new(unsigned long) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so) ==12549== by 0x43763A: boost::asio::io_service::service* boost::asio::detail::service_registry::create<boost::asio::detail::epoll_reactor>(boost::asio::io_service&) (service_registry.hpp:81) ==12549== by 0x432497: boost::asio::detail::service_registry::do_use_service(boost::asio::io_service::service::key const&, boost::asio::io_service::service* (*)(boost::asio::io_service&)) (service_registry.ipp:123) ==12549== by 0x4325B6: use_service<boost::asio::detail::epoll_reactor> (service_registry.hpp:48) ==12549== by 0x4325B6: use_service<boost::asio::detail::epoll_reactor> (io_service.hpp:33) ==12549== by 0x4325B6: reactive_socket_service_base (reactive_socket_service_base.ipp:33) ==12549== by 0x4325B6: reactive_socket_service (reactive_socket_service.hpp:77) ==12549== by 0x4325B6: stream_socket_service (stream_socket_service.hpp:95) ==12549== by 0x4325B6: boost::asio::io_service::service* boost::asio::detail::service_registry::create<boost::asio::stream_socket_service<boost::asio::ip::tcp> >(boost::asio::io_service&) (service_registry.hpp:81) ==12549== by 0x5CF0E5D: do_use_service (service_registry.ipp:123) ==12549== by 0x5CF0E5D: use_service<boost::asio::stream_socket_service<boost::asio::ip::tcp> > (service_registry.hpp:48) ==12549== by 0x5CF0E5D: use_service<boost::asio::stream_socket_service<boost::asio::ip::tcp> > (io_service.hpp:33) ==12549== by 0x5CF0E5D: basic_io_object (basic_io_object.hpp:183) ==12549== by 0x5CF0E5D: basic_socket (basic_socket.hpp:71) ==12549== by 0x5CF0E5D: basic_stream_socket (basic_stream_socket.hpp:73) ==12549== by 0x5CF0E5D: pulsar::ExecutorService::createSocket() (ExecutorService.cc:36) ==12549== by 0x5CB1794: pulsar::ClientConnection::ClientConnection(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::shared_ptr<pulsar::ExecutorService>, pulsar::ClientConfiguration const&, std::shared_ptr<pulsar::Authentication> const&) (ClientConnection.cc:169) ==12549== by 0x5C1D3F0: pulsar::ConnectionPool::getConnectionAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) (ConnectionPool.cc:83) ==12549== by 0x5CA573F: pulsar::BinaryProtoLookupService::getPartitionMetadataAsync(std::shared_ptr<pulsar::TopicName> const&) (BinaryProtoLookupService.cc:72) ==12549== by 0x5D00CA6: pulsar::ClientImpl::subscribeAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, pulsar::ConsumerConfiguration const&, std::function<void (pulsar::Result, pulsar::Consumer)>) (ClientImpl.cc:350) ==12549== by 0x5C985EC: pulsar::Client::subscribeAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, pulsar::ConsumerConfiguration const&, std::function<void (pulsar::Result, pulsar::Consumer)>) (Client.cc:89) ==12549== by 0x5C98803: pulsar::Client::subscribeAsync(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<void (pulsar::Result, pulsar::Consumer)>) (Client.cc:83) ==12549== by 0x4E3852: BasicEndToEndTest_testReceiveAsyncFailedConsumer_Test::TestBody() (BasicEndToEndTest.cc:2848) ==12549== ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
