This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 5d38ac9 fix(asyncio): Avoid InvalidStateError on late callbacks after
cancel (#297)
5d38ac9 is described below
commit 5d38ac94c87df9c9de34e2ff24047c5b4ff560b0
Author: Vibhav Singamshetty <[email protected]>
AuthorDate: Tue May 5 18:44:31 2026 -0700
fix(asyncio): Avoid InvalidStateError on late callbacks after cancel (#297)
---
pulsar/asyncio.py | 2 ++
tests/asyncio_test.py | 21 +++++++++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 7fa7c3d..2db4935 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -828,6 +828,8 @@ class Client:
def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
def complete():
+ if future.done():
+ return
if result == _pulsar.Result.Ok:
future.set_result(value)
else:
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 8a441c4..3cc1078 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -39,6 +39,7 @@ from pulsar.asyncio import ( # pylint: disable=import-error
Consumer,
Producer,
PulsarException,
+ _set_future,
)
from pulsar.schema import ( # pylint: disable=import-error
AvroSchema,
@@ -484,5 +485,25 @@ class AsyncioTest(IsolatedAsyncioTestCase):
self.assertEqual(msg.value().int_field, 42)
+class AsyncioSetFutureTest(IsolatedAsyncioTestCase):
+ """Tests for asyncio bridge helpers (no live Pulsar broker)."""
+
+ async def test_set_future_noop_when_future_cancelled(self):
+ loop = asyncio.get_running_loop()
+ fut = loop.create_future()
+ fut.cancel()
+ _set_future(fut, _pulsar.Result.Ok, None)
+ await asyncio.sleep(0)
+ self.assertTrue(fut.cancelled())
+
+ async def test_set_future_noop_when_future_already_resolved(self):
+ loop = asyncio.get_running_loop()
+ fut = loop.create_future()
+ fut.set_result("first")
+ _set_future(fut, _pulsar.Result.Ok, "late")
+ await asyncio.sleep(0)
+ self.assertEqual(fut.result(), "first")
+
+
if __name__ == '__main__':
main()