This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new ce25b36 Release the GIL before any call to async methods (#123)
ce25b36 is described below
commit ce25b367f5b0d048e68b3c7328180c2dd675eef6
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 24 17:29:25 2023 -0700
Release the GIL before any call to async methods (#123)
Fix #122
When call an async method on Pulsar C++ client, we need to be releasing the
GIL to avoid a deadlock between that and the producer lock.
---
src/utils.cc | 11 ++++++++++-
src/utils.h | 12 ++++++++----
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/src/utils.cc b/src/utils.cc
index 8ebc3f9..f45b801 100644
--- a/src/utils.cc
+++ b/src/utils.cc
@@ -21,7 +21,16 @@
void waitForAsyncResult(std::function<void(ResultCallback)> func) {
auto promise = std::make_shared<std::promise<Result>>();
- func([promise](Result result) { promise->set_value(result); });
+
+ {
+ // Always call the Pulsar C++ client methods without holding
+ // the GIL. This avoids deadlocks due the sequence of acquiring
+ // mutexes by different threads. eg:
+ // Thread-1: GIL -> producer.lock
+ // Thread-2: producer.lock -> GIL (In a callback)
+ py::gil_scoped_release release;
+ func([promise](Result result) { promise->set_value(result); });
+ }
internal::waitForResult(*promise);
}
diff --git a/src/utils.h b/src/utils.h
index bbe202e..910f0cd 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -49,10 +49,14 @@ inline T
waitForAsyncValue(std::function<void(std::function<void(Result, const T
auto resultPromise = std::make_shared<std::promise<Result>>();
auto valuePromise = std::make_shared<std::promise<T>>();
- func([resultPromise, valuePromise](Result result, const T& value) {
- valuePromise->set_value(value);
- resultPromise->set_value(result);
- });
+ {
+ py::gil_scoped_release release;
+
+ func([resultPromise, valuePromise](Result result, const T& value) {
+ valuePromise->set_value(value);
+ resultPromise->set_value(result);
+ });
+ }
internal::waitForResult(*resultPromise);
return valuePromise->get_future().get();