This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 0028893  Make acknowledge APIs synchronous and improve the documents 
(#121)
0028893 is described below

commit 00288931bc04929aab9c2717cd6e6c7e2a9f65e2
Author: Yunze Xu <[email protected]>
AuthorDate: Thu May 25 02:21:41 2023 +0800

    Make acknowledge APIs synchronous and improve the documents (#121)
    
    Fixes https://github.com/apache/pulsar-client-python/issues/114
    
    ### Motivation
    
    Currently the `acknowledge` and `acknowledge_cumulative` methods are all
    asynchronous. Even if any error happened, no exception would be raised.
    For example, when acknowledging cumulatively on a consumer whose
    consumer type is Shared or KeyShared, no error happens.
    
    ### Modifications
    
    - Change these methods to synchronous and raise exceptions if the
      acknowledgment failed.
    - Add `PulsarTest.test_acknowledge_failed` to test these failed cases.
    - Improve the documents to describe which exceptions could be raised in
      which cases.
---
 pulsar/__init__.py   | 10 ++++++++++
 src/consumer.cc      | 17 ++++++++---------
 tests/pulsar_test.py | 23 +++++++++++++++++++++++
 3 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index f7c05e2..c85c6e3 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -1305,6 +1305,11 @@ class Consumer:
 
         message:
             The received message or message id.
+
+        Raises
+        ------
+        OperationNotSupported
+             if `message` is not allowed to acknowledge
         """
         if isinstance(message, Message):
             self._consumer.acknowledge(message._message)
@@ -1324,6 +1329,11 @@ class Consumer:
 
         message:
             The received message or message id.
+
+        Raises
+        ------
+        CumulativeAcknowledgementNotAllowedError
+            if the consumer type is ConsumerType.KeyShared or 
ConsumerType.Shared
         """
         if isinstance(message, Message):
             self._consumer.acknowledge_cumulative(message._message)
diff --git a/src/consumer.cc b/src/consumer.cc
index 4b44775..67d2daa 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -50,11 +50,12 @@ Messages Consumer_batch_receive(Consumer& consumer) {
     return msgs;
 }
 
-void Consumer_acknowledge(Consumer& consumer, const Message& msg) { 
consumer.acknowledgeAsync(msg, nullptr); }
+void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
+    waitForAsyncResult([&](ResultCallback callback) { 
consumer.acknowledgeAsync(msg, callback); });
+}
 
 void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& 
msgId) {
-    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr);
-    Py_END_ALLOW_THREADS
+    waitForAsyncResult([&](ResultCallback callback) { 
consumer.acknowledgeAsync(msgId, callback); });
 }
 
 void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
@@ -63,18 +64,16 @@ void Consumer_negative_acknowledge(Consumer& consumer, 
const Message& msg) {
 }
 
 void Consumer_negative_acknowledge_message_id(Consumer& consumer, const 
MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId);
-    Py_END_ALLOW_THREADS
+    waitForAsyncResult([&](ResultCallback callback) { 
consumer.acknowledgeAsync(msgId, callback); });
 }
 
 void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
-    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr);
-    Py_END_ALLOW_THREADS
+    waitForAsyncResult([&](ResultCallback callback) { 
consumer.acknowledgeCumulativeAsync(msg, callback); });
 }
 
 void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const 
MessageId& msgId) {
-    Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr);
-    Py_END_ALLOW_THREADS
+    waitForAsyncResult(
+        [&](ResultCallback callback) { 
consumer.acknowledgeCumulativeAsync(msgId, callback); });
 }
 
 void Consumer_close(Consumer& consumer) {
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 00e2466..eeb2a6a 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1437,6 +1437,29 @@ class PulsarTest(TestCase):
         producer.flush()
         client.close()
 
+    def test_acknowledge_failed(self):
+        client = Client(self.serviceUrl)
+        topic = 'test_acknowledge_failed'
+        producer = client.create_producer(topic)
+        consumer1 = client.subscribe(topic, 'sub1', 
consumer_type=ConsumerType.Shared)
+        consumer2 = client.subscribe(topic, 'sub2', 
consumer_type=ConsumerType.KeyShared)
+        msg_id = producer.send('hello'.encode())
+        msg1 = consumer1.receive()
+        with 
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+            consumer1.acknowledge_cumulative(msg1)
+        with 
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+            consumer1.acknowledge_cumulative(msg1.message_id())
+        msg2 = consumer2.receive()
+        with 
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+            consumer2.acknowledge_cumulative(msg2)
+        with 
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+            consumer2.acknowledge_cumulative(msg2.message_id())
+        consumer = client.subscribe([topic, topic + '-another'], 'sub')
+        # The message id does not have a topic name
+        with self.assertRaises(pulsar.OperationNotSupported):
+            consumer.acknowledge(msg_id)
+        client.close()
+
 
 if __name__ == "__main__":
     main()

Reply via email to