This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new cff12ea  Fixed deadlock in producer.send_async (#87)
cff12ea is described below

commit cff12ead95470bff0cc5fd9cb429b814af4b4d21
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jan 31 17:56:41 2023 -0800

    Fixed deadlock in producer.send_async (#87)
    
    Fix #84
    
    Release the GIL while calling `producer.sendAsync()` to avoid a deadlock 
when PyBind is triggering the Python callback.
    
    
      * Main Thread
         1. Holds the Python GIL
         2. Call `producer.send_async()`
         3. Tries to acquire internal `ClientConnetion` lock
    
     * Pulsar client internal thread
         1. Holds lock on `ClientConnection`
         2. Receives ack from the broker
         3. Triggers callback
         4. PyBind11 acquires GIL <---- Deadlock
    
    The problem is the different behavior in PyBind from Boost::Python.
    
    We always need to make sure we release the GIL before making any call to 
C++ that potentially acquires any mutexes
---
 src/consumer.cc      | 10 ++++++++++
 src/producer.cc      | 12 +++++++++++-
 tests/pulsar_test.py | 14 ++++++++++++++
 3 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/src/consumer.cc b/src/consumer.cc
index a77bb50..972bd0b 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -59,23 +59,33 @@ Messages Consumer_batch_receive(Consumer& consumer) {
 void Consumer_acknowledge(Consumer& consumer, const Message& msg) { 
consumer.acknowledgeAsync(msg, nullptr); }
 
 void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& 
msgId) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.acknowledgeAsync(msgId, nullptr);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.negativeAcknowledge(msg);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_negative_acknowledge_message_id(Consumer& consumer, const 
MessageId& msgId) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.negativeAcknowledge(msgId);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.acknowledgeCumulativeAsync(msg, nullptr);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const 
MessageId& msgId) {
+    Py_BEGIN_ALLOW_THREADS
     consumer.acknowledgeCumulativeAsync(msgId, nullptr);
+    Py_END_ALLOW_THREADS
 }
 
 void Consumer_close(Consumer& consumer) {
diff --git a/src/producer.cc b/src/producer.cc
index bba262a..1dd5a76 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -34,6 +34,16 @@ MessageId Producer_send(Producer& producer, const Message& 
message) {
     return messageId;
 }
 
+void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback 
callback) {
+    Py_BEGIN_ALLOW_THREADS
+    producer.sendAsync(msg, callback);
+    Py_END_ALLOW_THREADS
+
+    if (PyErr_CheckSignals() == -1) {
+        PyErr_SetInterrupt();
+    }
+}
+
 void Producer_flush(Producer& producer) {
     waitForAsyncResult([&](ResultCallback callback) { 
producer.flushAsync(callback); });
 }
@@ -67,7 +77,7 @@ void export_producer(py::module_& m) {
              "This method is equivalent to asyncSend() and wait until the 
callback is triggered.\n"
              "\n"
              "@param msg message to publish\n")
-        .def("send_async", &Producer::sendAsync)
+        .def("send_async", &Producer_sendAsync)
         .def("flush", &Producer_flush,
              "Flush all the messages buffered in the client and wait until all 
messages have been\n"
              "successfully persisted\n")
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index feba877..00e2466 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1424,5 +1424,19 @@ class PulsarTest(TestCase):
         with self.assertRaises(RuntimeError):
             AuthenticationBasic(auth_params_string='invalid auth params')
 
+    def test_send_async_no_deadlock(self):
+        client = Client(self.serviceUrl)
+        producer = client.create_producer('test_send_async_no_deadlock')
+
+        def send_callback(res, msg):
+            print(f"Message '{msg}' published res={res}")
+
+        for i in range(30):
+            producer.send_async(f"Hello-{i}".encode('utf-8'), 
callback=send_callback)
+
+        producer.flush()
+        client.close()
+
+
 if __name__ == "__main__":
     main()

Reply via email to