merlimat commented on issue #127:
URL:
https://github.com/apache/pulsar-client-python/issues/127#issuecomment-1572866434
I tried to expose the manual hook in C++ client and call it from Python
wrapper.
So far I was not successful in not making it crash when
### C++ Diff
```diff
diff --git include/pulsar/Client.h include/pulsar/Client.h
index 3514934..d648612 100644
--- include/pulsar/Client.h
+++ include/pulsar/Client.h
@@ -46,6 +46,12 @@ class ClientImpl;
class PulsarFriend;
class PulsarWrapper;
+enum ForkEvent {
+ ForkPrepare,
+ ForkParent,
+ ForkChild
+};
+
class PULSAR_PUBLIC Client {
public:
/**
@@ -414,6 +420,26 @@ class PULSAR_PUBLIC Client {
void getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)>
callback);
+ /**
+ * Notify the Pulsar client instance about an OS fork() operation.
+ * This will help ensure the state of threads and mutexes is kept
consistent
+ * after the fork is completed.
+ *
+ * Example of how to use this:
+ *
+ * <code>
+ * pulsarClient.notifyFork(pulsar::ForkPrepare);
+ * if (fork() == 0) {
+ * // This is the child process.
+ * pulsarClient.notifyFork(pulsar::ForkChild);
+ * } else {
+ * // This is the parent process.
+ * pulsarClient.NotifyFork(pulsar::ForkParent);
+ * }
+ * </code>
+ */
+ void notifyFork(ForkEvent forkEvent);
+
private:
Client(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration,
bool poolConnections);
diff --git lib/Client.cc lib/Client.cc
index 48c4a67..fa92a43 100644
--- lib/Client.cc
+++ lib/Client.cc
@@ -192,6 +192,8 @@ void Client::closeAsync(CloseCallback callback) {
impl_->closeAsync(callback); }
void Client::shutdown() { impl_->shutdown(); }
+void Client::notifyFork(ForkEvent forkEvent) {
impl_->notifyFork(forkEvent); }
+
uint64_t Client::getNumberOfProducers() { return
impl_->getNumberOfProducers(); }
uint64_t Client::getNumberOfConsumers() { return
impl_->getNumberOfConsumers(); }
diff --git lib/ClientImpl.cc lib/ClientImpl.cc
index a8fc24c..e912cff 100644
--- lib/ClientImpl.cc
+++ lib/ClientImpl.cc
@@ -772,4 +772,20 @@ std::string ClientImpl::getClientVersion(const
ClientConfiguration& clientConfig
return oss.str();
}
+void ClientImpl::notifyFork(ForkEvent forkEvent) {
+ Lock lock(mutex_);
+
+// if (ioExecutorProvider_) {
+// ioExecutorProvider_->notifyFork(forkEvent);
+// }
+
+ if (listenerExecutorProvider_) {
+ listenerExecutorProvider_->notifyFork(forkEvent);
+ }
+
+ if (partitionListenerExecutorProvider_) {
+ partitionListenerExecutorProvider_->notifyFork(forkEvent);
+ }
+}
+
} /* namespace pulsar */
diff --git lib/ClientImpl.h lib/ClientImpl.h
index 9ee7095..ce12fd2 100644
--- lib/ClientImpl.h
+++ lib/ClientImpl.h
@@ -124,6 +124,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const {
return requestIdGenerator_; }
+ void notifyFork(ForkEvent forkEvent);
+
friend class PulsarFriend;
private:
diff --git lib/ExecutorService.cc lib/ExecutorService.cc
index a0dff0b..fa2829e 100644
--- lib/ExecutorService.cc
+++ lib/ExecutorService.cc
@@ -20,6 +20,7 @@
#include "LogUtils.h"
#include "TimeUtils.h"
+#include "Latch.h"
DECLARE_LOG_OBJECT()
namespace pulsar {
@@ -157,4 +158,32 @@ void ExecutorServiceProvider::close(long timeoutMs) {
executor.reset();
}
}
+
+
+static void notifyForkOnIoService(ExecutorServicePtr executor,
boost::asio::io_service::fork_event event) {
+ Latch l(1);
+
+ executor->postWork([=, &l] {
+ executor->getIOService().notify_fork(event);
+ l.countdown();
+ });
+
+ l.wait();
+}
+
+void ExecutorServiceProvider::notifyFork(ForkEvent forkEvent) {
+ Lock lock(mutex_);
+
+ for (auto& executor : executors_) {
+ switch (forkEvent) {
+ case ForkPrepare:
+ notifyForkOnIoService(executor,
boost::asio::io_service::fork_prepare);
+ case ForkParent:
+ notifyForkOnIoService(executor,
boost::asio::io_service::fork_parent);
+ case ForkChild:
+ notifyForkOnIoService(executor,
boost::asio::io_service::fork_child);
+ }
+ }
+}
+
} // namespace pulsar
diff --git lib/ExecutorService.h lib/ExecutorService.h
index 4717ccb..d14de4c 100644
--- lib/ExecutorService.h
+++ lib/ExecutorService.h
@@ -32,6 +32,7 @@
#include <memory>
#include <mutex>
#include <thread>
+#include <pulsar/Client.h>
namespace pulsar {
typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
@@ -93,6 +94,8 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
// See TimeoutProcessor for the semantics of the parameter.
void close(long timeoutMs = 3000);
+ void notifyFork(ForkEvent forkEvent);
+
private:
typedef std::vector<ExecutorServicePtr> ExecutorList;
ExecutorList executors_;
```
### Python Diff:
```diff
diff --git pulsar/__init__.py pulsar/__init__.py
index dbf3d82..4cb55ad 100644
--- pulsar/__init__.py
+++ pulsar/__init__.py
@@ -58,6 +58,7 @@ from pulsar.functions.serde import SerDe, IdentitySerDe,
PickleSerDe
from pulsar import schema
_schema = schema
+import os
import re
_retype = type(re.compile('x'))
@@ -491,6 +492,14 @@ class Client:
conf.tls_validate_hostname(tls_validate_hostname)
self._client = _pulsar.Client(service_url, conf)
self._consumers = []
+ # Prepare to handle forks
+ os.register_at_fork(before=lambda:
self._notifyFork(_pulsar.ForkEvent.Prepare),
+ after_in_child=lambda:
self._notifyFork(_pulsar.ForkEvent.Child),
+ after_in_parent=lambda:
self._notifyFork(_pulsar.ForkEvent.Parent))
+
+ def _notifyFork(self, event):
+ print("NOTIFY FORK: %s" % event)
+ self._client.notifyFork(event)
@staticmethod
def _prepare_logger(logger):
diff --git src/client.cc src/client.cc
index 626ff9f..2369842 100644
--- src/client.cc
+++ src/client.cc
@@ -79,5 +79,6 @@ void export_client(py::module_& m) {
.def("get_topic_partitions", &Client_getTopicPartitions)
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
+ .def("notifyFork", &Client::notifyFork)
.def("shutdown", &Client::shutdown);
}
diff --git src/enums.cc src/enums.cc
index f61011f..3e63de7 100644
--- src/enums.cc
+++ src/enums.cc
@@ -124,4 +124,9 @@ void export_enums(py::module_& m) {
.value("Info", Logger::LEVEL_INFO)
.value("Warn", Logger::LEVEL_WARN)
.value("Error", Logger::LEVEL_ERROR);
+
+ enum_<ForkEvent>(m, "ForkEvent")
+ .value("Prepare", ForkPrepare)
+ .value("Parent", ForkParent)
+ .value("Child", ForkChild);
}
```
### Still crashing:
```
Thread 0 Crashed:: Dispatch queue: com.apple.main-thread
0 _pulsar.cpython-311-darwin.so 0x10857682c void
boost::asio::io_context::initiate_post::operator()<std::__1::function<void
()>&>(std::__1::function<void ()>&, boost::asio::io_context*) const + 184
(io_context.hpp:194)
1 _pulsar.cpython-311-darwin.so 0x108576768 void
boost::asio::detail::completion_handler_async_result<std::__1::function<void
()>, void ()>::initiate<boost::asio::io_context::initiate_post,
std::__1::function<void ()>&,
boost::asio::io_context*>(boost::asio::io_context::initiate_post&&,
std::__1::function<void ()>&, boost::asio::io_context*&&) + 44
(async_result.hpp:482)
2 _pulsar.cpython-311-darwin.so 0x108576730
boost::asio::constraint<detail::async_result_has_initiate_memfn<std::__1::function<void
()>&, void ()>::value,
decltype(async_result<std::__1::decay<std::__1::function<void ()>&>::type, void
()>::initiate(declval<boost::asio::io_context::initiate_post&&>(),
declval<std::__1::function<void ()>&>(),
declval<boost::asio::io_context*&&>()))>::type
boost::asio::async_initiate<std::__1::function<void ()>&, void (),
boost::asio::io_context::initiate_post,
boost::asio::io_context*>(boost::asio::io_context::initiate_post&&,
std::__1::function<void ()>&, boost::asio::io_context*&&) + 40
(async_result.hpp:895)
3 _pulsar.cpython-311-darwin.so 0x108562098
decltype(async_initiate<std::__1::function<void ()>&, void
()>(decltype(__declval<std::__1::function<void ()>&>(0))
std::__1::declval<boost::asio::io_context::initiate_post>()(), fp, this))
boost::asio::io_context::post<std::__1::function<void
()>&>(std::__1::function<void ()>&) + 44 (io_context.hpp:206)
4 _pulsar.cpython-311-darwin.so 0x108562060
pulsar::ExecutorService::postWork(std::__1::function<void ()>) + 28
(ExecutorService.cc:130)
5 _pulsar.cpython-311-darwin.so 0x1085626dc
pulsar::notifyForkOnIoService(std::__1::shared_ptr<pulsar::ExecutorService>,
boost::asio::execution_context::fork_event) + 128 (ExecutorService.cc:166)
6 _pulsar.cpython-311-darwin.so 0x108562574
pulsar::ExecutorServiceProvider::notifyFork(pulsar::ForkEvent) + 200
(ExecutorService.cc:180)
7 _pulsar.cpython-311-darwin.so 0x108446e70
pulsar::ClientImpl::notifyFork(pulsar::ForkEvent) + 80 (ClientImpl.cc:783)
8 _pulsar.cpython-311-darwin.so 0x108386860
pulsar::Client::notifyFork(pulsar::ForkEvent) + 36 (Client.cc:195)
9 _pulsar.cpython-311-darwin.so 0x108249460
pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent,
pybind11::name, pybind11::is_method, pybind11::sibling>(void
(pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&,
pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pulsar::Client*,
pulsar::ForkEvent)::operator()(pulsar::Client*, pulsar::ForkEvent) const + 120
(pybind11.h:109)
10 _pulsar.cpython-311-darwin.so 0x1082493dc void
pybind11::detail::argument_loader<pulsar::Client*,
pulsar::ForkEvent>::call_impl<void, pybind11::cpp_function::cpp_function<void,
pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method,
pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name
const&, pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)&, 0ul, 1ul,
pybind11::detail::void_type>(pulsar::Client&&,
pybind11::detail::index_sequence<0ul, 1ul>, pybind11::detail::void_type&&) && +
88 (cast.h:1439)
11 _pulsar.cpython-311-darwin.so 0x108248f78
std::__1::enable_if<std::is_void<void>::value,
pybind11::detail::void_type>::type
pybind11::detail::argument_loader<pulsar::Client*,
pulsar::ForkEvent>::call<void, pybind11::detail::void_type,
pybind11::cpp_function::cpp_function<void, pulsar::Client, pulsar::ForkEvent,
pybind11::name, pybind11::is_method, pybind11::sibling>(void
(pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&,
pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pulsar::Client*,
pulsar::ForkEvent)&>(pybind11::cpp_function::cpp_function<void, pulsar::Client,
pulsar::ForkEvent, pybind11::name, pybind11::is_method, pybind11::sibling>(void
(pulsar::Client::*)(pulsar::ForkEvent), pybind11::name const&,
pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent)&) && + 36 (cast.h:1413)
12 _pulsar.cpython-311-darwin.so 0x108248eac void
pybind11::cpp_function::initialize<pybind11::cpp_function::cpp_function<void,
pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method,
pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name
const&, pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent), void, pulsar::Client*,
pulsar::ForkEvent, pybind11::name, pybind11::is_method,
pybind11::sibling>(void&&, pulsar::Client (*)(pulsar::ForkEvent),
pybind11::name const&, pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pybind11::detail::function_call&)::operator()(pybind11::detail::function_call&)
const + 132 (pybind11.h:249)
13 _pulsar.cpython-311-darwin.so 0x108248e0c void
pybind11::cpp_function::initialize<pybind11::cpp_function::cpp_function<void,
pulsar::Client, pulsar::ForkEvent, pybind11::name, pybind11::is_method,
pybind11::sibling>(void (pulsar::Client::*)(pulsar::ForkEvent), pybind11::name
const&, pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pulsar::Client*, pulsar::ForkEvent), void, pulsar::Client*,
pulsar::ForkEvent, pybind11::name, pybind11::is_method,
pybind11::sibling>(void&&, pulsar::Client (*)(pulsar::ForkEvent),
pybind11::name const&, pybind11::is_method const&, pybind11::sibling
const&)::'lambda'(pybind11::detail::function_call&)::__invoke(pybind11::detail::function_call&)
+ 28 (pybind11.h:224)
14 _pulsar.cpython-311-darwin.so 0x1081e0494
pybind11::cpp_function::dispatcher(_object*, _object*, _object*) + 3960
(pybind11.h:929)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]