This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4b61547 [asyncio] Support creating producer and sending messages
(#189)
4b61547 is described below
commit 4b61547e0f79011b1d76e24a7a9745c1d3d6e36f
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Jan 4 11:50:15 2024 +0800
[asyncio] Support creating producer and sending messages (#189)
Master issue: https://github.com/apache/pulsar-client-python/issues/55
### Modifications
Introduce a `pulsar.asyncio` module that includes the asynchronous APIs
to work with Python asyncio module.
Example:
```python
async def main():
client = Client('pulsar://localhost:6650')
try:
producer = await client.create_producer('topic')
msg_id = await producer.send('msg'.encode())
await producer.close()
except PulsarException as e:
error = e.error()
await client.close()
```
The creation of `Client` reuses the same keyword arguments from
`pulsar.Client` but the options for creating producer and sending
messages are not added yet.
---
pulsar/__init__.py | 6 ++
pulsar/asyncio.py | 172 ++++++++++++++++++++++++++++++++++++++++++++++++
src/client.cc | 14 ++++
src/producer.cc | 6 ++
tests/asyncio_test.py | 85 ++++++++++++++++++++++++
tests/run-unit-tests.sh | 1 +
6 files changed, 284 insertions(+)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index a44a0db..29bb034 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -105,6 +105,12 @@ class MessageId:
"""
return self._msg_id.serialize()
+ def __str__(self) -> str:
+ """
+ Returns the string representation of the message id.
+ """
+ return str(self._msg_id)
+
@staticmethod
def deserialize(message_id_bytes):
"""
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
new file mode 100644
index 0000000..445d477
--- /dev/null
+++ b/pulsar/asyncio.py
@@ -0,0 +1,172 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+The Pulsar Python client APIs that work with the asyncio module.
+"""
+
+import asyncio
+import functools
+from typing import Any
+
+import _pulsar
+import pulsar
+
+class PulsarException(BaseException):
+ """
+ The exception that wraps the Pulsar error code
+ """
+
+ def __init__(self, result: pulsar.Result) -> None:
+ """
+ Create the Pulsar exception.
+
+ Parameters
+ ----------
+ result: pulsar.Result
+ The error code of the underlying Pulsar APIs.
+ """
+ self._result = result
+
+ def error(self) -> pulsar.Result:
+ """
+ Returns the Pulsar error code.
+ """
+ return self._result
+
+ def __str__(self):
+ """
+ Convert the exception to string.
+ """
+ return f'{self._result.value} {self._result.name}'
+
+class Producer:
+ """
+ The Pulsar message producer, used to publish messages on a topic.
+ """
+
+ def __init__(self, producer: _pulsar.Producer) -> None:
+ """
+ Create the producer.
+ Users should not call this constructor directly. Instead, create the
+ producer via `Client.create_producer`.
+
+ Parameters
+ ----------
+ producer: _pulsar.Producer
+ The underlying Producer object from the C extension.
+ """
+ self._producer: _pulsar.Producer = producer
+
+ async def send(self, content: bytes) -> pulsar.MessageId:
+ """
+ Send a message asynchronously.
+
+ parameters
+ ----------
+ content: bytes
+ The message payload
+
+ Returns
+ -------
+ pulsar.MessageId
+ The message id that represents the persisted position of the
message.
+
+ Raises
+ ------
+ PulsarException
+ """
+ builder = _pulsar.MessageBuilder()
+ builder.content(content)
+ future = asyncio.get_running_loop().create_future()
+ self._producer.send_async(builder.build(),
functools.partial(_set_future, future))
+ msg_id = await future
+ return pulsar.MessageId(
+ msg_id.partition(),
+ msg_id.ledger_id(),
+ msg_id.entry_id(),
+ msg_id.batch_index(),
+ )
+
+ async def close(self) -> None:
+ """
+ Close the producer.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._producer.close_async(functools.partial(_set_future, future,
value=None))
+ await future
+
+class Client:
+ """
+ The asynchronous version of `pulsar.Client`.
+ """
+
+ def __init__(self, service_url, **kwargs) -> None:
+ """
+ See `pulsar.Client.__init__`
+ """
+ self._client: _pulsar.Client = pulsar.Client(service_url,
**kwargs)._client
+
+ async def create_producer(self, topic: str) -> Producer:
+ """
+ Create a new producer on a given topic
+
+ Parameters
+ ----------
+ topic: str
+ The topic name
+
+ Returns
+ -------
+ Producer
+ The producer created
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ conf = _pulsar.ProducerConfiguration()
+ # TODO: add more configs
+ self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
+ return Producer(await future)
+
+ async def close(self) -> None:
+ """
+ Close the client and all the associated producers and consumers
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._client.close_async(functools.partial(_set_future, future,
value=None))
+ await future
+
+def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
+ def complete():
+ if result == _pulsar.Result.Ok:
+ future.set_result(value)
+ else:
+ future.set_exception(PulsarException(result))
+ future.get_loop().call_soon_threadsafe(complete)
diff --git a/src/client.cc b/src/client.cc
index 626ff9f..b25c63a 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -18,6 +18,7 @@
*/
#include "utils.h"
+#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
@@ -28,6 +29,12 @@ Producer Client_createProducer(Client& client, const
std::string& topic, const P
[&](CreateProducerCallback callback) {
client.createProducerAsync(topic, conf, callback); });
}
+void Client_createProducerAsync(Client& client, const std::string& topic,
ProducerConfiguration conf,
+ CreateProducerCallback callback) {
+ py::gil_scoped_release release;
+ client.createProducerAsync(topic, conf, callback);
+}
+
Consumer Client_subscribe(Client& client, const std::string& topic, const
std::string& subscriptionName,
const ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>(
@@ -68,10 +75,16 @@ void Client_close(Client& client) {
waitForAsyncResult([&](ResultCallback callback) {
client.closeAsync(callback); });
}
+void Client_closeAsync(Client& client, ResultCallback callback) {
+ py::gil_scoped_release release;
+ client.closeAsync(callback);
+}
+
void export_client(py::module_& m) {
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
.def(py::init<const std::string&, const ClientConfiguration&>())
.def("create_producer", &Client_createProducer)
+ .def("create_producer_async", &Client_createProducerAsync)
.def("subscribe", &Client_subscribe)
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_pattern", &Client_subscribe_pattern)
@@ -79,5 +92,6 @@ void export_client(py::module_& m) {
.def("get_topic_partitions", &Client_getTopicPartitions)
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
+ .def("close_async", &Client_closeAsync)
.def("shutdown", &Client::shutdown);
}
diff --git a/src/producer.cc b/src/producer.cc
index 7027185..9b38016 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -46,6 +46,11 @@ void Producer_close(Producer& producer) {
waitForAsyncResult([&](ResultCallback callback) {
producer.closeAsync(callback); });
}
+void Producer_closeAsync(Producer& producer, ResultCallback callback) {
+ py::gil_scoped_release release;
+ producer.closeAsync(callback);
+}
+
void export_producer(py::module_& m) {
using namespace py;
@@ -76,5 +81,6 @@ void export_producer(py::module_& m) {
"Flush all the messages buffered in the client and wait until all
messages have been\n"
"successfully persisted\n")
.def("close", &Producer_close)
+ .def("close_async", &Producer_closeAsync)
.def("is_connected", &Producer::isConnected);
}
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
new file mode 100644
index 0000000..5478b60
--- /dev/null
+++ b/tests/asyncio_test.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import asyncio
+import pulsar
+from pulsar.asyncio import (
+ Client,
+ PulsarException,
+)
+from unittest import (
+ main,
+ IsolatedAsyncioTestCase,
+)
+
+service_url = 'pulsar://localhost:6650'
+
+class AsyncioTest(IsolatedAsyncioTestCase):
+
+ async def asyncSetUp(self) -> None:
+ self._client = Client(service_url)
+
+ async def asyncTearDown(self) -> None:
+ await self._client.close()
+
+ async def test_batch_send(self):
+ producer = await
self._client.create_producer('awaitio-test-batch-send')
+ tasks = []
+ for i in range(5):
+
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
+ msg_ids = await asyncio.gather(*tasks)
+ self.assertEqual(len(msg_ids), 5)
+ ledger_id = msg_ids[0].ledger_id()
+ entry_id = msg_ids[0].entry_id()
+ # These messages should be in the same entry
+ for i in range(5):
+ msg_id = msg_ids[i]
+ print(f'{i} was sent to {msg_id}')
+ self.assertIsInstance(msg_id, pulsar.MessageId)
+ self.assertEqual(msg_ids[i].ledger_id(), ledger_id)
+ self.assertEqual(msg_ids[i].entry_id(), entry_id)
+ self.assertEqual(msg_ids[i].batch_index(), i)
+
+ async def test_create_producer_failure(self):
+ try:
+ await
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+ self.fail()
+ except PulsarException as e:
+ self.assertEqual(e.error(), pulsar.Result.AuthorizationError)
+
+ async def test_send_failure(self):
+ producer = await
self._client.create_producer('awaitio-test-send-failure')
+ try:
+ await producer.send(('x' * 1024 * 1024 * 10).encode())
+ self.fail()
+ except PulsarException as e:
+ self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
+
+ async def test_close_producer(self):
+ producer = await
self._client.create_producer('awaitio-test-close-producer')
+ await producer.close()
+ try:
+ await producer.close()
+ self.fail()
+ except PulsarException as e:
+ self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh
index 5168f94..ea0b450 100755
--- a/tests/run-unit-tests.sh
+++ b/tests/run-unit-tests.sh
@@ -26,3 +26,4 @@ cd $ROOT_DIR/tests
python3 custom_logger_test.py
python3 interrupted_test.py
python3 pulsar_test.py
+python3 asyncio_test.py