This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d38ac9  fix(asyncio): Avoid InvalidStateError on late callbacks after 
cancel (#297)
5d38ac9 is described below

commit 5d38ac94c87df9c9de34e2ff24047c5b4ff560b0
Author: Vibhav Singamshetty <[email protected]>
AuthorDate: Tue May 5 18:44:31 2026 -0700

    fix(asyncio): Avoid InvalidStateError on late callbacks after cancel (#297)
---
 pulsar/asyncio.py     |  2 ++
 tests/asyncio_test.py | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 7fa7c3d..2db4935 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -828,6 +828,8 @@ class Client:
 
 def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
     def complete():
+        if future.done():
+            return
         if result == _pulsar.Result.Ok:
             future.set_result(value)
         else:
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 8a441c4..3cc1078 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -39,6 +39,7 @@ from pulsar.asyncio import (  # pylint: disable=import-error
     Consumer,
     Producer,
     PulsarException,
+    _set_future,
 )
 from pulsar.schema import (  # pylint: disable=import-error
     AvroSchema,
@@ -484,5 +485,25 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         self.assertEqual(msg.value().int_field, 42)
 
 
+class AsyncioSetFutureTest(IsolatedAsyncioTestCase):
+    """Tests for asyncio bridge helpers (no live Pulsar broker)."""
+
+    async def test_set_future_noop_when_future_cancelled(self):
+        loop = asyncio.get_running_loop()
+        fut = loop.create_future()
+        fut.cancel()
+        _set_future(fut, _pulsar.Result.Ok, None)
+        await asyncio.sleep(0)
+        self.assertTrue(fut.cancelled())
+
+    async def test_set_future_noop_when_future_already_resolved(self):
+        loop = asyncio.get_running_loop()
+        fut = loop.create_future()
+        fut.set_result("first")
+        _set_future(fut, _pulsar.Result.Ok, "late")
+        await asyncio.sleep(0)
+        self.assertEqual(fut.result(), "first")
+
+
 if __name__ == '__main__':
     main()

Reply via email to