merlimat commented on issue #127:
URL: 
https://github.com/apache/pulsar-client-python/issues/127#issuecomment-1572866434

   I tried to expose the manual hook in C++ client and call it from Python 
wrapper. 
   So far I was not successful in not making it crash when 
   
   
   ### C++ Diff
   
   ```diff
   diff --git include/pulsar/Client.h include/pulsar/Client.h
   index 3514934..d648612 100644
   --- include/pulsar/Client.h
   +++ include/pulsar/Client.h
   @@ -46,6 +46,12 @@ class ClientImpl;
    class PulsarFriend;
    class PulsarWrapper;
    
   +enum ForkEvent {
   +    ForkPrepare,
   +    ForkParent,
   +    ForkChild
   +};
   +
    class PULSAR_PUBLIC Client {
       public:
        /**
   @@ -414,6 +420,26 @@ class PULSAR_PUBLIC Client {
        void getSchemaInfoAsync(const std::string& topic, int64_t version,
                                std::function<void(Result, const SchemaInfo&)> 
callback);
    
   +    /**
   +     * Notify the Pulsar client instance about an OS fork() operation.
   +     * This will help ensure the state of threads and mutexes is kept 
consistent
   +     * after the fork is completed.
   +     *
   +     * Example of how to use this:
   +     *
   +     * <code>
   +     * pulsarClient.notifyFork(pulsar::ForkPrepare);
   +     * if (fork() == 0) {
   +     *     // This is the child process.
   +     *     pulsarClient.notifyFork(pulsar::ForkChild);
   +     * } else {
   +     *     // This is the parent process.
   +     *     pulsarClient.NotifyFork(pulsar::ForkParent);
   +     * }
   +     * </code>
   +     */
   +    void notifyFork(ForkEvent forkEvent);
   +
       private:
        Client(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration,
               bool poolConnections);
   diff --git lib/Client.cc lib/Client.cc
   index 48c4a67..fa92a43 100644
   --- lib/Client.cc
   +++ lib/Client.cc
   @@ -192,6 +192,8 @@ void Client::closeAsync(CloseCallback callback) { 
impl_->closeAsync(callback); }
    
    void Client::shutdown() { impl_->shutdown(); }
    
   +void Client::notifyFork(ForkEvent forkEvent) { 
impl_->notifyFork(forkEvent); }
   +
    uint64_t Client::getNumberOfProducers() { return 
impl_->getNumberOfProducers(); }
    uint64_t Client::getNumberOfConsumers() { return 
impl_->getNumberOfConsumers(); }
    
   diff --git lib/ClientImpl.cc lib/ClientImpl.cc
   index a8fc24c..e912cff 100644
   --- lib/ClientImpl.cc
   +++ lib/ClientImpl.cc
   @@ -772,4 +772,20 @@ std::string ClientImpl::getClientVersion(const 
ClientConfiguration& clientConfig
        return oss.str();
    }
    
   +void ClientImpl::notifyFork(ForkEvent forkEvent) {
   +    Lock lock(mutex_);
   +
   +//    if (ioExecutorProvider_) {
   +//        ioExecutorProvider_->notifyFork(forkEvent);
   +//    }
   +
   +    if (listenerExecutorProvider_) {
   +        listenerExecutorProvider_->notifyFork(forkEvent);
   +    }
   +
   +    if (partitionListenerExecutorProvider_) {
   +        partitionListenerExecutorProvider_->notifyFork(forkEvent);
   +    }
   +}
   +
    } /* namespace pulsar */
   diff --git lib/ClientImpl.h lib/ClientImpl.h
   index 9ee7095..ce12fd2 100644
   --- lib/ClientImpl.h
   +++ lib/ClientImpl.h
   @@ -124,6 +124,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
    
        std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { 
return requestIdGenerator_; }
    
   +    void notifyFork(ForkEvent forkEvent);
   +
        friend class PulsarFriend;
    
       private:
   diff --git lib/ExecutorService.cc lib/ExecutorService.cc
   index a0dff0b..fa2829e 100644
   --- lib/ExecutorService.cc
   +++ lib/ExecutorService.cc
   @@ -20,6 +20,7 @@
    
    #include "LogUtils.h"
    #include "TimeUtils.h"
   +#include "Latch.h"
    DECLARE_LOG_OBJECT()
    
    namespace pulsar {
   @@ -157,4 +158,32 @@ void ExecutorServiceProvider::close(long timeoutMs) {
            executor.reset();
        }
    }
   +
   +
   +static void notifyForkOnIoService(ExecutorServicePtr executor, 
boost::asio::io_service::fork_event event) {
   +    Latch l(1);
   +
   +    executor->postWork([=, &l] {
   +        executor->getIOService().notify_fork(event);
   +        l.countdown();
   +    });
   +
   +    l.wait();
   +}
   +
   +void ExecutorServiceProvider::notifyFork(ForkEvent forkEvent) {
   +    Lock lock(mutex_);
   +
   +    for (auto& executor : executors_) {
   +        switch (forkEvent) {
   +            case ForkPrepare:
   +                notifyForkOnIoService(executor, 
boost::asio::io_service::fork_prepare);
   +            case ForkParent:
   +                notifyForkOnIoService(executor, 
boost::asio::io_service::fork_parent);
   +            case ForkChild:
   +                notifyForkOnIoService(executor, 
boost::asio::io_service::fork_child);
   +        }
   +    }
   +}
   +
    }  // namespace pulsar
   diff --git lib/ExecutorService.h lib/ExecutorService.h
   index 4717ccb..d14de4c 100644
   --- lib/ExecutorService.h
   +++ lib/ExecutorService.h
   @@ -32,6 +32,7 @@
    #include <memory>
    #include <mutex>
    #include <thread>
   +#include <pulsar/Client.h>
    
    namespace pulsar {
    typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
   @@ -93,6 +94,8 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
        // See TimeoutProcessor for the semantics of the parameter.
        void close(long timeoutMs = 3000);
    
   +    void notifyFork(ForkEvent forkEvent);
   +
       private:
        typedef std::vector<ExecutorServicePtr> ExecutorList;
        ExecutorList executors_;
   
   ```
   
   ### Python Diff:
   
   ```diff
   diff --git pulsar/__init__.py pulsar/__init__.py
   index dbf3d82..4cb55ad 100644
   --- pulsar/__init__.py
   +++ pulsar/__init__.py
   @@ -58,6 +58,7 @@ from pulsar.functions.serde import SerDe, IdentitySerDe, 
PickleSerDe
    from pulsar import schema
    _schema = schema
    
   +import os
    import re
    _retype = type(re.compile('x'))
    
   @@ -491,6 +492,14 @@ class Client:
            conf.tls_validate_hostname(tls_validate_hostname)
            self._client = _pulsar.Client(service_url, conf)
            self._consumers = []
   +        # Prepare to handle forks
   +        os.register_at_fork(before=lambda: 
self._notifyFork(_pulsar.ForkEvent.Prepare),
   +                            after_in_child=lambda: 
self._notifyFork(_pulsar.ForkEvent.Child),
   +                            after_in_parent=lambda: 
self._notifyFork(_pulsar.ForkEvent.Parent))
   +
   +    def _notifyFork(self, event):
   +        print("NOTIFY FORK: %s" % event)
   +        self._client.notifyFork(event)
    
        @staticmethod
        def _prepare_logger(logger):
   diff --git src/client.cc src/client.cc
   index 626ff9f..2369842 100644
   --- src/client.cc
   +++ src/client.cc
   @@ -79,5 +79,6 @@ void export_client(py::module_& m) {
            .def("get_topic_partitions", &Client_getTopicPartitions)
            .def("get_schema_info", &Client_getSchemaInfo)
            .def("close", &Client_close)
   +        .def("notifyFork", &Client::notifyFork)
            .def("shutdown", &Client::shutdown);
    }
   diff --git src/enums.cc src/enums.cc
   index f61011f..3e63de7 100644
   --- src/enums.cc
   +++ src/enums.cc
   @@ -124,4 +124,9 @@ void export_enums(py::module_& m) {
            .value("Info", Logger::LEVEL_INFO)
            .value("Warn", Logger::LEVEL_WARN)
            .value("Error", Logger::LEVEL_ERROR);
   +
   +    enum_<ForkEvent>(m, "ForkEvent")
   +        .value("Prepare", ForkPrepare)
   +        .value("Parent", ForkParent)
   +        .value("Child", ForkChild);
    }
   
   ```
   
   ### Still crashing:
   
   ```
   Thread 0 Crashed::  Dispatch queue: com.apple.main-thread
   0   _pulsar.cpython-311-darwin.so           0x10857682c void 
boost::asio::io_context::initiate_post::operator()<std::__1::function<void 
()>&>(std::__1::function<void ()>&, boost::asio::io_context*) const + 184 
(io_context.hpp:194)
   1   _pulsar.cpython-311-darwin.so           0x108576768 void 
boost::asio::detail::completion_handler_async_result<std::__1::function<void 
()>, void ()>::initiate<boost::asio::io_context::initiate_post, 
std::__1::function<void ()>&, 
boost::asio::io_context*>(boost::asio::io_context::initiate_post&&, 
std::__1::function<void ()>&, boost::asio::io_context*&&) + 44 
(async_result.hpp:482)
   2   _pulsar.cpython-311-darwin.so           0x108576730 
boost::asio::constraint<detail::async_result_has_initiate_memfn<std::__1::function<void
 ()>&, void ()>::value, 
decltype(async_result<std::__1::decay<std::__1::function<void ()>&>::type, void 
()>::initiate(declval<boost::asio::io_context::initiate_post&&>(), 
declval<std::__1::function<void ()>&>(), 
declval<boost::asio::io_context*&&>()))>::type 
boost::asio::async_initiate<std::__1::function<void ()>&, void (), 
boost::asio::io_context::initiate_post, 
boost::asio::io_context*>(boost::asio::io_context::initiate_post&&, 
std::__1::function<void ()>&, boost::asio::io_context*&&) + 40 
(async_result.hpp:895)
   3   _pulsar.cpython-311-darwin.so           0x108562098 
decltype(async_initiate<std::__1::function<void ()>&, void 
()>(decltype(__declval<std::__1::function<void ()>&>(0)) 
std::__1::declval<boost::asio::io_context::initiate_post>()(), fp, this)) 
boost::asio::io_context::post<std::__1::function<void 
()>&>(std::__1::function<void ()>&) + 44 (io_context.hpp:206)
   4   _pulsar.cpython-311-darwin.so           0x108562060 
pulsar::ExecutorService::postWork(std::__1::function<void ()>) + 28 
(ExecutorService.cc:130)
   5   _pulsar.cpython-311-darwin.so           0x1085626dc 
pulsar::notifyForkOnIoService(std::__1::shared_ptr<pulsar::ExecutorService>, 
boost::asio::execution_context::fork_event) + 128 (ExecutorService.cc:166)
   6   _pulsar.cpython-311-darwin.so           0x108562574 
pulsar::ExecutorServiceProvider::notifyFork(pulsar::ForkEvent) + 200 
(ExecutorService.cc:180)
   7   _pulsar.cpython-311-darwin.so           0x108446e70 
pulsar::ClientImpl::notifyFork(pulsar::ForkEvent) + 80 (ClientImpl.cc:783)
   8   _pulsar.cpython-311-darwin.so           0x108386860 
pulsar::Client::notifyFork(pulsar::ForkEvent) + 36 (Client.cc:195)
   9   _pulsar.cpython-311-darwin.so           0x108249460 
pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, 
pybind11::name, pybind11::is_method, pybind11::sibling>(void 
(pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, 
pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pulsar::Client*, 
pulsar::ForkEvent)::operator()(pulsar::Client*, pulsar::ForkEvent) const + 120 
(pybind11.h:109)
   10  _pulsar.cpython-311-darwin.so           0x1082493dc void 
pybind11::detail::argument_loader<pulsar::Client*, 
pulsar::ForkEvent>::call_impl<void, pybind11::cpp_function::cpp_function<void, 
pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, 
pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name 
const&, pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)&, 0ul, 1ul, 
pybind11::detail::void_type>(pulsar::Client&&, 
pybind11::detail::index_sequence<0ul, 1ul>, pybind11::detail::void_type&&) && + 
88 (cast.h:1439)
   11  _pulsar.cpython-311-darwin.so           0x108248f78 
std::__1::enable_if<std::is_void<void>::value, 
pybind11::detail::void_type>::type 
pybind11::detail::argument_loader<pulsar::Client*, 
pulsar::ForkEvent>::call<void, pybind11::detail::void_type, 
pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent, 
pybind11::name, pybind11::is_method, pybind11::sibling>(void 
(pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, 
pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pulsar::Client*, 
pulsar::ForkEvent)&>(pybind11::cpp_function::cpp_function<void, pulsar::Client, 
pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void 
(pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&, 
pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)&) && + 36 (cast.h:1413)
   12  _pulsar.cpython-311-darwin.so           0x108248eac void 
pybind11::cpp_function::initialize<pybind11::cpp_function::cpp_function<void, 
pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, 
pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name 
const&, pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent), void, pulsar::Client*, 
pulsar::ForkEvent, pybind11::name, pybind11::is_method, 
pybind11::sibling>(void&&, pulsar::Client (*)(pulsar::ForkEvent), 
pybind11::name const&, pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pybind11::detail::function_call&)::operator()(pybind11::detail::function_call&)
 const + 132 (pybind11.h:249)
   13  _pulsar.cpython-311-darwin.so           0x108248e0c void 
pybind11::cpp_function::initialize<pybind11::cpp_function::cpp_function<void, 
pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method, 
pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name 
const&, pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent), void, pulsar::Client*, 
pulsar::ForkEvent, pybind11::name, pybind11::is_method, 
pybind11::sibling>(void&&, pulsar::Client (*)(pulsar::ForkEvent), 
pybind11::name const&, pybind11::is_method const&, pybind11::sibling 
const&)::'lambda'(pybind11::detail::function_call&)::__invoke(pybind11::detail::function_call&)
 + 28 (pybind11.h:224)
   14  _pulsar.cpython-311-darwin.so           0x1081e0494 
pybind11::cpp_function::dispatcher(_object*, _object*, _object*) + 3960 
(pybind11.h:929)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to