This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 0028893 Make acknowledge APIs synchronous and improve the documents
(#121)
0028893 is described below
commit 00288931bc04929aab9c2717cd6e6c7e2a9f65e2
Author: Yunze Xu <[email protected]>
AuthorDate: Thu May 25 02:21:41 2023 +0800
Make acknowledge APIs synchronous and improve the documents (#121)
Fixes https://github.com/apache/pulsar-client-python/issues/114
### Motivation
Currently the `acknowledge` and `acknowledge_cumulative` methods are all
asynchronous. Even if any error happened, no exception would be raised.
For example, when acknowledging cumulatively on a consumer whose
consumer type is Shared or KeyShared, no error happens.
### Modifications
- Change these methods to synchronous and raise exceptions if the
acknowledgment failed.
- Add `PulsarTest.test_acknowledge_failed` to test these failed cases.
- Improve the documents to describe which exceptions could be raised in
which cases.
---
pulsar/__init__.py | 10 ++++++++++
src/consumer.cc | 17 ++++++++---------
tests/pulsar_test.py | 23 +++++++++++++++++++++++
3 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index f7c05e2..c85c6e3 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -1305,6 +1305,11 @@ class Consumer:
message:
The received message or message id.
+
+ Raises
+ ------
+ OperationNotSupported
+ if `message` is not allowed to acknowledge
"""
if isinstance(message, Message):
self._consumer.acknowledge(message._message)
@@ -1324,6 +1329,11 @@ class Consumer:
message:
The received message or message id.
+
+ Raises
+ ------
+ CumulativeAcknowledgementNotAllowedError
+ if the consumer type is ConsumerType.KeyShared or
ConsumerType.Shared
"""
if isinstance(message, Message):
self._consumer.acknowledge_cumulative(message._message)
diff --git a/src/consumer.cc b/src/consumer.cc
index 4b44775..67d2daa 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -50,11 +50,12 @@ Messages Consumer_batch_receive(Consumer& consumer) {
return msgs;
}
-void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
consumer.acknowledgeAsync(msg, nullptr); }
+void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
+ waitForAsyncResult([&](ResultCallback callback) {
consumer.acknowledgeAsync(msg, callback); });
+}
void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId&
msgId) {
- Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr);
- Py_END_ALLOW_THREADS
+ waitForAsyncResult([&](ResultCallback callback) {
consumer.acknowledgeAsync(msgId, callback); });
}
void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
@@ -63,18 +64,16 @@ void Consumer_negative_acknowledge(Consumer& consumer,
const Message& msg) {
}
void Consumer_negative_acknowledge_message_id(Consumer& consumer, const
MessageId& msgId) {
- Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId);
- Py_END_ALLOW_THREADS
+ waitForAsyncResult([&](ResultCallback callback) {
consumer.acknowledgeAsync(msgId, callback); });
}
void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
- Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr);
- Py_END_ALLOW_THREADS
+ waitForAsyncResult([&](ResultCallback callback) {
consumer.acknowledgeCumulativeAsync(msg, callback); });
}
void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const
MessageId& msgId) {
- Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr);
- Py_END_ALLOW_THREADS
+ waitForAsyncResult(
+ [&](ResultCallback callback) {
consumer.acknowledgeCumulativeAsync(msgId, callback); });
}
void Consumer_close(Consumer& consumer) {
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 00e2466..eeb2a6a 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1437,6 +1437,29 @@ class PulsarTest(TestCase):
producer.flush()
client.close()
+ def test_acknowledge_failed(self):
+ client = Client(self.serviceUrl)
+ topic = 'test_acknowledge_failed'
+ producer = client.create_producer(topic)
+ consumer1 = client.subscribe(topic, 'sub1',
consumer_type=ConsumerType.Shared)
+ consumer2 = client.subscribe(topic, 'sub2',
consumer_type=ConsumerType.KeyShared)
+ msg_id = producer.send('hello'.encode())
+ msg1 = consumer1.receive()
+ with
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+ consumer1.acknowledge_cumulative(msg1)
+ with
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+ consumer1.acknowledge_cumulative(msg1.message_id())
+ msg2 = consumer2.receive()
+ with
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+ consumer2.acknowledge_cumulative(msg2)
+ with
self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
+ consumer2.acknowledge_cumulative(msg2.message_id())
+ consumer = client.subscribe([topic, topic + '-another'], 'sub')
+ # The message id does not have a topic name
+ with self.assertRaises(pulsar.OperationNotSupported):
+ consumer.acknowledge(msg_id)
+ client.close()
+
if __name__ == "__main__":
main()