This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 36d6fd6  feat: add individual negative acknowledgement for async 
consumer (#282)
36d6fd6 is described below

commit 36d6fd63478fbf9c5fa348bdd14cc913c2b98bc1
Author: Nikolas Achatz <[email protected]>
AuthorDate: Sat Jan 17 03:04:36 2026 -0700

    feat: add individual negative acknowledgement for async consumer (#282)
---
 pulsar/asyncio.py     | 25 +++++++++++++++++++++++++
 src/consumer.cc       | 12 ++++++++++++
 tests/asyncio_test.py | 30 ++++++++++++++++++++++++++++++
 3 files changed, 67 insertions(+)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 5c3178a..064e353 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -320,6 +320,31 @@ class Consumer:
         )
         await future
 
+    async def negative_acknowledge(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, 
_pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the failure to process a single message asynchronously.
+
+        When a message is "negatively acked" it will be marked for redelivery 
after
+        some fixed delay. The delay is configurable when constructing the 
consumer
+        with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+        This call is not blocking.
+        
+        Parameters
+        ----------
+        message:
+            The received message or message id.
+        """
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        await asyncio.to_thread(self._consumer.negative_acknowledge, msg)
+
     async def unsubscribe(self) -> None:
         """
         Unsubscribe the current consumer from the topic asynchronously.
diff --git a/src/consumer.cc b/src/consumer.cc
index f1d7367..fa52720 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -133,6 +133,16 @@ void 
Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me
     consumer.acknowledgeCumulativeAsync(msgId, callback);
 }
 
+void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message& 
msg, ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.negativeAcknowledge(msg);
+}
+
+void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const 
MessageId& msgId, ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.negativeAcknowledge(msgId);
+}
+
 void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {
     py::gil_scoped_release release;
     consumer.closeAsync(callback);
@@ -183,6 +193,8 @@ void export_consumer(py::module_& m) {
         .def("acknowledge_async", &Consumer_acknowledgeAsync_message_id)
         .def("acknowledge_cumulative_async", 
&Consumer_acknowledgeCumulativeAsync)
         .def("acknowledge_cumulative_async", 
&Consumer_acknowledgeCumulativeAsync_message_id)
+        .def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
+        .def("negative_acknowledge_async", 
&Consumer_negative_acknowledgeAsync_message_id) 
         .def("close_async", &Consumer_closeAsync)
         .def("unsubscribe_async", &Consumer_unsubscribeAsync)
         .def("seek_async", &Consumer_seekAsync)
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 048dc43..66ff0fd 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -203,6 +203,36 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         msg = await consumer.receive()
         self.assertEqual(msg.data(), b'msg-3')
 
+    async def test_consumer_negative_acknowledge(self):
+        topic = f'asyncio-test-consumer-negative-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub,
+                                                
consumer_type=pulsar.ConsumerType.Shared,
+                                                
negative_ack_redelivery_delay_ms=100)
+        
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        msgs = []
+        for _ in range(5):
+            msg = await consumer.receive()
+            msgs.append(msg)
+
+        await consumer.acknowledge(msgs[1])
+        await consumer.acknowledge(msgs[3])
+        
+        await consumer.negative_acknowledge(msgs[0])
+        await consumer.negative_acknowledge(msgs[2])
+        await consumer.negative_acknowledge(msgs[4])
+        await asyncio.sleep(0.2)
+        
+        received = []
+        for _ in range(3):
+            msg = await consumer.receive()
+            received.append(msg.data())
+        
+        self.assertEqual(sorted(received), [b'msg-0', b'msg-2', b'msg-4'])
+        await consumer.close()
+
     async def test_multi_topic_consumer(self):
         topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
         producers = []

Reply via email to