This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new af27edd  feat: add get partitioned topic names (#289)
af27edd is described below

commit af27edd12417f6eae1013d26465b0dc21cc0423c
Author: Nikolas Achatz <[email protected]>
AuthorDate: Sun Feb 1 19:48:40 2026 -0800

    feat: add get partitioned topic names (#289)
---
 pulsar/asyncio.py     | 26 +++++++++++++++++++++
 src/client.cc         |  6 +++++
 tests/asyncio_test.py | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 95 insertions(+)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index a1ca0c0..a429cf8 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -39,6 +39,7 @@ from _pulsar import (
     ConsumerCryptoFailureAction,
 )
 import pulsar
+from pulsar import _check_type
 
 class PulsarException(BaseException):
     """
@@ -775,6 +776,31 @@ class Client:
 
         schema.attach_client(self._client)
         return Consumer(await future, schema)
+    
+    async def get_topic_partitions(self, topic: str) -> List[str]:
+        """
+        Get the list of partitions for a given topic in asynchronous mode.
+
+        If the topic is partitioned, this will return a list of partition 
names. If the topic is not partitioned, the returned list will contain the 
topic name itself.
+
+        This can be used to discover the partitions and create Reader, 
Consumer or Producer instances directly on a particular partition.
+
+        Parameters
+        ----------
+
+        topic: str
+            the topic name to lookup
+
+        Returns
+        -------
+        list
+            a list of partition names
+        """
+        _check_type(str, topic, 'topic')
+        future = asyncio.get_running_loop().create_future()
+        self._client.get_topic_partitions_async(topic, 
functools.partial(_set_future, future))
+        id = await future
+        return id
 
     async def close(self) -> None:
         """
diff --git a/src/client.cc b/src/client.cc
index 64056df..d77938f 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -65,6 +65,11 @@ std::vector<std::string> Client_getTopicPartitions(Client& 
client, const std::st
         [&](GetPartitionsCallback callback) { 
client.getPartitionsForTopicAsync(topic, callback); });
 }
 
+void Client_getTopicPartitionsAsync(Client &client, const std::string& topic, 
GetPartitionsCallback callback) {
+    py::gil_scoped_release release;
+    client.getPartitionsForTopicAsync(topic, callback);
+}
+
 SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic, 
int64_t version) {
     return waitForAsyncValue<SchemaInfo>([&](std::function<void(Result, const 
SchemaInfo&)> callback) {
         client.getSchemaInfoAsync(topic, version, callback);
@@ -119,6 +124,7 @@ void export_client(py::module_& m) {
         .def("get_schema_info", &Client_getSchemaInfo)
         .def("close", &Client_close)
         .def("close_async", &Client_closeAsync)
+        .def("get_topic_partitions_async", &Client_getTopicPartitionsAsync)
         .def("subscribe_async", &Client_subscribeAsync)
         .def("subscribe_async_topics", &Client_subscribeAsync_topics)
         .def("subscribe_async_pattern", &Client_subscribeAsync_pattern)
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 4440809..32371cb 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -47,7 +47,29 @@ from pulsar.schema import (  # pylint: disable=import-error
     String,
 )
 
+from urllib.request import urlopen, Request
+
 SERVICE_URL = 'pulsar://localhost:6650'
+ADMIN_URL = "http://localhost:8080";
+TIMEOUT_MS = 10000  # Do not wait forever in tests
+
+def doHttpPost(url, data):
+    req = Request(url, data.encode())
+    req.add_header("Content-Type", "application/json")
+    urlopen(req)
+
+def doHttpPut(url, data):
+    try:
+        req = Request(url, data.encode())
+        req.add_header("Content-Type", "application/json")
+        req.get_method = lambda: "PUT"
+        urlopen(req)
+    except Exception as ex:
+        # ignore conflicts exception to have test idempotency
+        if "409" in str(ex):
+            pass
+        else:
+            raise ex
 
 class AsyncioTest(IsolatedAsyncioTestCase):
     """Test cases for asyncio Pulsar client."""
@@ -133,6 +155,47 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         self.assertEqual(msg_id0.batch_index(), 0)
         self.assertEqual(msg_id1.batch_index(), 1)
 
+    async def test_get_topics_partitions(self):
+        topic_partitioned = 
"persistent://public/default/test_get_topics_partitions_async"
+        topic_non_partitioned = 
"persistent://public/default/test_get_topics_async_not-partitioned"
+
+        url1 = ADMIN_URL + 
"/admin/v2/persistent/public/default/test_get_topics_partitions_async/partitions"
+        doHttpPut(url1, "3")
+
+        self.assertEqual(
+            await self._client.get_topic_partitions(topic_partitioned),
+            [
+                
"persistent://public/default/test_get_topics_partitions_async-partition-0",
+                
"persistent://public/default/test_get_topics_partitions_async-partition-1",
+                
"persistent://public/default/test_get_topics_partitions_async-partition-2",
+            ],
+        )
+        self.assertEqual(await 
self._client.get_topic_partitions(topic_non_partitioned), 
[topic_non_partitioned])
+
+    async def test_get_partitioned_topic_name(self):
+        url1 = ADMIN_URL + 
"/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions"
+        doHttpPut(url1, "3")
+
+        partitions = [
+            
"persistent://public/default/partitioned_topic_name_test-partition-0",
+            
"persistent://public/default/partitioned_topic_name_test-partition-1",
+            
"persistent://public/default/partitioned_topic_name_test-partition-2",
+        ]
+        self.assertEqual(
+            await 
self._client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"),
 partitions
+        )
+
+        consumer = await self._client.subscribe(
+            "persistent://public/default/partitioned_topic_name_test",
+            "partitioned_topic_name_test_sub",
+            consumer_type=pulsar.ConsumerType.Shared,
+        )
+        producer = await 
self._client.create_producer("persistent://public/default/partitioned_topic_name_test")
+        await producer.send(b"hello")
+
+        msg = await asyncio.wait_for(consumer.receive(), TIMEOUT_MS / 1000)
+        self.assertTrue(msg.topic_name() in partitions)
+
     async def test_create_producer_failure(self):
         try:
             await 
self._client.create_producer('tenant/ns/asyncio-test-send-failure')

Reply via email to