This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new af27edd feat: add get partitioned topic names (#289)
af27edd is described below
commit af27edd12417f6eae1013d26465b0dc21cc0423c
Author: Nikolas Achatz <[email protected]>
AuthorDate: Sun Feb 1 19:48:40 2026 -0800
feat: add get partitioned topic names (#289)
---
pulsar/asyncio.py | 26 +++++++++++++++++++++
src/client.cc | 6 +++++
tests/asyncio_test.py | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 95 insertions(+)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index a1ca0c0..a429cf8 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -39,6 +39,7 @@ from _pulsar import (
ConsumerCryptoFailureAction,
)
import pulsar
+from pulsar import _check_type
class PulsarException(BaseException):
"""
@@ -775,6 +776,31 @@ class Client:
schema.attach_client(self._client)
return Consumer(await future, schema)
+
+ async def get_topic_partitions(self, topic: str) -> List[str]:
+ """
+ Get the list of partitions for a given topic in asynchronous mode.
+
+ If the topic is partitioned, this will return a list of partition
names. If the topic is not partitioned, the returned list will contain the
topic name itself.
+
+ This can be used to discover the partitions and create Reader,
Consumer or Producer instances directly on a particular partition.
+
+ Parameters
+ ----------
+
+ topic: str
+ the topic name to lookup
+
+ Returns
+ -------
+ list
+ a list of partition names
+ """
+ _check_type(str, topic, 'topic')
+ future = asyncio.get_running_loop().create_future()
+ self._client.get_topic_partitions_async(topic,
functools.partial(_set_future, future))
+ id = await future
+ return id
async def close(self) -> None:
"""
diff --git a/src/client.cc b/src/client.cc
index 64056df..d77938f 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -65,6 +65,11 @@ std::vector<std::string> Client_getTopicPartitions(Client&
client, const std::st
[&](GetPartitionsCallback callback) {
client.getPartitionsForTopicAsync(topic, callback); });
}
+void Client_getTopicPartitionsAsync(Client &client, const std::string& topic,
GetPartitionsCallback callback) {
+ py::gil_scoped_release release;
+ client.getPartitionsForTopicAsync(topic, callback);
+}
+
SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic,
int64_t version) {
return waitForAsyncValue<SchemaInfo>([&](std::function<void(Result, const
SchemaInfo&)> callback) {
client.getSchemaInfoAsync(topic, version, callback);
@@ -119,6 +124,7 @@ void export_client(py::module_& m) {
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
.def("close_async", &Client_closeAsync)
+ .def("get_topic_partitions_async", &Client_getTopicPartitionsAsync)
.def("subscribe_async", &Client_subscribeAsync)
.def("subscribe_async_topics", &Client_subscribeAsync_topics)
.def("subscribe_async_pattern", &Client_subscribeAsync_pattern)
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 4440809..32371cb 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -47,7 +47,29 @@ from pulsar.schema import ( # pylint: disable=import-error
String,
)
+from urllib.request import urlopen, Request
+
SERVICE_URL = 'pulsar://localhost:6650'
+ADMIN_URL = "http://localhost:8080"
+TIMEOUT_MS = 10000 # Do not wait forever in tests
+
+def doHttpPost(url, data):
+ req = Request(url, data.encode())
+ req.add_header("Content-Type", "application/json")
+ urlopen(req)
+
+def doHttpPut(url, data):
+ try:
+ req = Request(url, data.encode())
+ req.add_header("Content-Type", "application/json")
+ req.get_method = lambda: "PUT"
+ urlopen(req)
+ except Exception as ex:
+ # ignore conflicts exception to have test idempotency
+ if "409" in str(ex):
+ pass
+ else:
+ raise ex
class AsyncioTest(IsolatedAsyncioTestCase):
"""Test cases for asyncio Pulsar client."""
@@ -133,6 +155,47 @@ class AsyncioTest(IsolatedAsyncioTestCase):
self.assertEqual(msg_id0.batch_index(), 0)
self.assertEqual(msg_id1.batch_index(), 1)
+ async def test_get_topics_partitions(self):
+ topic_partitioned =
"persistent://public/default/test_get_topics_partitions_async"
+ topic_non_partitioned =
"persistent://public/default/test_get_topics_async_not-partitioned"
+
+ url1 = ADMIN_URL +
"/admin/v2/persistent/public/default/test_get_topics_partitions_async/partitions"
+ doHttpPut(url1, "3")
+
+ self.assertEqual(
+ await self._client.get_topic_partitions(topic_partitioned),
+ [
+
"persistent://public/default/test_get_topics_partitions_async-partition-0",
+
"persistent://public/default/test_get_topics_partitions_async-partition-1",
+
"persistent://public/default/test_get_topics_partitions_async-partition-2",
+ ],
+ )
+ self.assertEqual(await
self._client.get_topic_partitions(topic_non_partitioned),
[topic_non_partitioned])
+
+ async def test_get_partitioned_topic_name(self):
+ url1 = ADMIN_URL +
"/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions"
+ doHttpPut(url1, "3")
+
+ partitions = [
+
"persistent://public/default/partitioned_topic_name_test-partition-0",
+
"persistent://public/default/partitioned_topic_name_test-partition-1",
+
"persistent://public/default/partitioned_topic_name_test-partition-2",
+ ]
+ self.assertEqual(
+ await
self._client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"),
partitions
+ )
+
+ consumer = await self._client.subscribe(
+ "persistent://public/default/partitioned_topic_name_test",
+ "partitioned_topic_name_test_sub",
+ consumer_type=pulsar.ConsumerType.Shared,
+ )
+ producer = await
self._client.create_producer("persistent://public/default/partitioned_topic_name_test")
+ await producer.send(b"hello")
+
+ msg = await asyncio.wait_for(consumer.receive(), TIMEOUT_MS / 1000)
+ self.assertTrue(msg.topic_name() in partitions)
+
async def test_create_producer_failure(self):
try:
await
self._client.create_producer('tenant/ns/asyncio-test-send-failure')