This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b61547  [asyncio] Support creating producer and sending messages 
(#189)
4b61547 is described below

commit 4b61547e0f79011b1d76e24a7a9745c1d3d6e36f
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Jan 4 11:50:15 2024 +0800

    [asyncio] Support creating producer and sending messages (#189)
    
    Master issue: https://github.com/apache/pulsar-client-python/issues/55
    
    ### Modifications
    
    Introduce a `pulsar.asyncio` module that includes the asynchronous APIs
    to work with Python asyncio module.
    
    Example:
    
    ```python
    async def main():
        client = Client('pulsar://localhost:6650')
        try:
            producer = await client.create_producer('topic')
            msg_id = await producer.send('msg'.encode())
            await producer.close()
        except PulsarException as e:
            error = e.error()
        await client.close()
    ```
    
    The creation of `Client` reuses the same keyword arguments from
    `pulsar.Client` but the options for creating producer and sending
    messages are not added yet.
---
 pulsar/__init__.py      |   6 ++
 pulsar/asyncio.py       | 172 ++++++++++++++++++++++++++++++++++++++++++++++++
 src/client.cc           |  14 ++++
 src/producer.cc         |   6 ++
 tests/asyncio_test.py   |  85 ++++++++++++++++++++++++
 tests/run-unit-tests.sh |   1 +
 6 files changed, 284 insertions(+)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index a44a0db..29bb034 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -105,6 +105,12 @@ class MessageId:
         """
         return self._msg_id.serialize()
 
+    def __str__(self) -> str:
+        """
+        Returns the string representation of the message id.
+        """
+        return str(self._msg_id)
+
     @staticmethod
     def deserialize(message_id_bytes):
         """
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
new file mode 100644
index 0000000..445d477
--- /dev/null
+++ b/pulsar/asyncio.py
@@ -0,0 +1,172 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+The Pulsar Python client APIs that work with the asyncio module.
+"""
+
+import asyncio
+import functools
+from typing import Any
+
+import _pulsar
+import pulsar
+
+class PulsarException(BaseException):
+    """
+    The exception that wraps the Pulsar error code
+    """
+
+    def __init__(self, result: pulsar.Result) -> None:
+        """
+        Create the Pulsar exception.
+
+        Parameters
+        ----------
+        result: pulsar.Result
+            The error code of the underlying Pulsar APIs.
+        """
+        self._result = result
+
+    def error(self) -> pulsar.Result:
+        """
+        Returns the Pulsar error code.
+        """
+        return self._result
+
+    def __str__(self):
+        """
+        Convert the exception to string.
+        """
+        return f'{self._result.value} {self._result.name}'
+
+class Producer:
+    """
+    The Pulsar message producer, used to publish messages on a topic.
+    """
+
+    def __init__(self, producer: _pulsar.Producer) -> None:
+        """
+        Create the producer.
+        Users should not call this constructor directly. Instead, create the
+        producer via `Client.create_producer`.
+
+        Parameters
+        ----------
+        producer: _pulsar.Producer
+            The underlying Producer object from the C extension.
+        """
+        self._producer: _pulsar.Producer = producer
+
+    async def send(self, content: bytes) -> pulsar.MessageId:
+        """
+        Send a message asynchronously.
+
+        parameters
+        ----------
+        content: bytes
+            The message payload
+
+        Returns
+        -------
+        pulsar.MessageId
+            The message id that represents the persisted position of the 
message.
+
+        Raises
+        ------
+        PulsarException
+        """
+        builder = _pulsar.MessageBuilder()
+        builder.content(content)
+        future = asyncio.get_running_loop().create_future()
+        self._producer.send_async(builder.build(), 
functools.partial(_set_future, future))
+        msg_id = await future
+        return pulsar.MessageId(
+            msg_id.partition(),
+            msg_id.ledger_id(),
+            msg_id.entry_id(),
+            msg_id.batch_index(),
+        )
+
+    async def close(self) -> None:
+        """
+        Close the producer.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._producer.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+class Client:
+    """
+    The asynchronous version of `pulsar.Client`.
+    """
+
+    def __init__(self, service_url, **kwargs) -> None:
+        """
+        See `pulsar.Client.__init__`
+        """
+        self._client: _pulsar.Client = pulsar.Client(service_url, 
**kwargs)._client
+
+    async def create_producer(self, topic: str) -> Producer:
+        """
+        Create a new producer on a given topic
+
+        Parameters
+        ----------
+        topic: str
+            The topic name
+
+        Returns
+        -------
+        Producer
+            The producer created
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        conf = _pulsar.ProducerConfiguration()
+        # TODO: add more configs
+        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        return Producer(await future)
+
+    async def close(self) -> None:
+        """
+        Close the client and all the associated producers and consumers
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._client.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
+    def complete():
+        if result == _pulsar.Result.Ok:
+            future.set_result(value)
+        else:
+            future.set_exception(PulsarException(result))
+    future.get_loop().call_soon_threadsafe(complete)
diff --git a/src/client.cc b/src/client.cc
index 626ff9f..b25c63a 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -18,6 +18,7 @@
  */
 #include "utils.h"
 
+#include <pybind11/functional.h>
 #include <pybind11/pybind11.h>
 #include <pybind11/stl.h>
 
@@ -28,6 +29,12 @@ Producer Client_createProducer(Client& client, const 
std::string& topic, const P
         [&](CreateProducerCallback callback) { 
client.createProducerAsync(topic, conf, callback); });
 }
 
+void Client_createProducerAsync(Client& client, const std::string& topic, 
ProducerConfiguration conf,
+                                CreateProducerCallback callback) {
+    py::gil_scoped_release release;
+    client.createProducerAsync(topic, conf, callback);
+}
+
 Consumer Client_subscribe(Client& client, const std::string& topic, const 
std::string& subscriptionName,
                           const ConsumerConfiguration& conf) {
     return waitForAsyncValue<Consumer>(
@@ -68,10 +75,16 @@ void Client_close(Client& client) {
     waitForAsyncResult([&](ResultCallback callback) { 
client.closeAsync(callback); });
 }
 
+void Client_closeAsync(Client& client, ResultCallback callback) {
+    py::gil_scoped_release release;
+    client.closeAsync(callback);
+}
+
 void export_client(py::module_& m) {
     py::class_<Client, std::shared_ptr<Client>>(m, "Client")
         .def(py::init<const std::string&, const ClientConfiguration&>())
         .def("create_producer", &Client_createProducer)
+        .def("create_producer_async", &Client_createProducerAsync)
         .def("subscribe", &Client_subscribe)
         .def("subscribe_topics", &Client_subscribe_topics)
         .def("subscribe_pattern", &Client_subscribe_pattern)
@@ -79,5 +92,6 @@ void export_client(py::module_& m) {
         .def("get_topic_partitions", &Client_getTopicPartitions)
         .def("get_schema_info", &Client_getSchemaInfo)
         .def("close", &Client_close)
+        .def("close_async", &Client_closeAsync)
         .def("shutdown", &Client::shutdown);
 }
diff --git a/src/producer.cc b/src/producer.cc
index 7027185..9b38016 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -46,6 +46,11 @@ void Producer_close(Producer& producer) {
     waitForAsyncResult([&](ResultCallback callback) { 
producer.closeAsync(callback); });
 }
 
+void Producer_closeAsync(Producer& producer, ResultCallback callback) {
+    py::gil_scoped_release release;
+    producer.closeAsync(callback);
+}
+
 void export_producer(py::module_& m) {
     using namespace py;
 
@@ -76,5 +81,6 @@ void export_producer(py::module_& m) {
              "Flush all the messages buffered in the client and wait until all 
messages have been\n"
              "successfully persisted\n")
         .def("close", &Producer_close)
+        .def("close_async", &Producer_closeAsync)
         .def("is_connected", &Producer::isConnected);
 }
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
new file mode 100644
index 0000000..5478b60
--- /dev/null
+++ b/tests/asyncio_test.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import asyncio
+import pulsar
+from pulsar.asyncio import (
+    Client,
+    PulsarException,
+)
+from unittest import (
+    main,
+    IsolatedAsyncioTestCase,
+)
+
+service_url = 'pulsar://localhost:6650'
+
+class AsyncioTest(IsolatedAsyncioTestCase):
+
+    async def asyncSetUp(self) -> None:
+        self._client = Client(service_url)
+
+    async def asyncTearDown(self) -> None:
+        await self._client.close()
+
+    async def test_batch_send(self):
+        producer = await 
self._client.create_producer('awaitio-test-batch-send')
+        tasks = []
+        for i in range(5):
+            
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
+        msg_ids = await asyncio.gather(*tasks)
+        self.assertEqual(len(msg_ids), 5)
+        ledger_id = msg_ids[0].ledger_id()
+        entry_id = msg_ids[0].entry_id()
+        # These messages should be in the same entry
+        for i in range(5):
+            msg_id = msg_ids[i]
+            print(f'{i} was sent to {msg_id}')
+            self.assertIsInstance(msg_id, pulsar.MessageId)
+            self.assertEqual(msg_ids[i].ledger_id(), ledger_id)
+            self.assertEqual(msg_ids[i].entry_id(), entry_id)
+            self.assertEqual(msg_ids[i].batch_index(), i)
+
+    async def test_create_producer_failure(self):
+        try:
+            await 
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+            self.fail()
+        except PulsarException as e:
+            self.assertEqual(e.error(), pulsar.Result.AuthorizationError)
+
+    async def test_send_failure(self):
+        producer = await 
self._client.create_producer('awaitio-test-send-failure')
+        try:
+            await producer.send(('x' * 1024 * 1024 * 10).encode())
+            self.fail()
+        except PulsarException as e:
+            self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
+
+    async def test_close_producer(self):
+        producer = await 
self._client.create_producer('awaitio-test-close-producer')
+        await producer.close()
+        try:
+            await producer.close()
+            self.fail()
+        except PulsarException as e:
+            self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+
+if __name__ == '__main__':
+    main()
diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh
index 5168f94..ea0b450 100755
--- a/tests/run-unit-tests.sh
+++ b/tests/run-unit-tests.sh
@@ -26,3 +26,4 @@ cd $ROOT_DIR/tests
 python3 custom_logger_test.py
 python3 interrupted_test.py
 python3 pulsar_test.py
+python3 asyncio_test.py

Reply via email to