This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 961411f  Implement async consumer (#277)
961411f is described below

commit 961411f896564e0ce0c40f63aa09cb2917b77587
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 24 16:43:20 2025 +0800

    Implement async consumer (#277)
---
 pulsar/asyncio.py     | 401 ++++++++++++++++++++++++++++++++++++++++++++++++--
 src/client.cc         |  23 +++
 src/consumer.cc       |  64 +++++++-
 tests/asyncio_test.py | 220 +++++++++++++++++++++++++--
 4 files changed, 680 insertions(+), 28 deletions(-)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 445d477..01246c6 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -17,15 +17,22 @@
 # under the License.
 #
 
+# pylint: disable=no-name-in-module,c-extension-no-member,protected-access
+
 """
 The Pulsar Python client APIs that work with the asyncio module.
 """
 
 import asyncio
 import functools
-from typing import Any
+from typing import Any, List, Union
 
 import _pulsar
+from _pulsar import (
+    InitialPosition,
+    RegexSubscriptionMode,
+    ConsumerCryptoFailureAction,
+)
 import pulsar
 
 class PulsarException(BaseException):
@@ -61,7 +68,7 @@ class Producer:
     The Pulsar message producer, used to publish messages on a topic.
     """
 
-    def __init__(self, producer: _pulsar.Producer) -> None:
+    def __init__(self, producer: _pulsar.Producer, schema: 
pulsar.schema.Schema) -> None:
         """
         Create the producer.
         Users should not call this constructor directly. Instead, create the
@@ -71,17 +78,21 @@ class Producer:
         ----------
         producer: _pulsar.Producer
             The underlying Producer object from the C extension.
+        schema: pulsar.schema.Schema
+            The schema of the data that will be sent by this producer.
         """
-        self._producer: _pulsar.Producer = producer
+        self._producer = producer
+        self._schema = schema
 
-    async def send(self, content: bytes) -> pulsar.MessageId:
+    async def send(self, content: Any) -> pulsar.MessageId:
         """
         Send a message asynchronously.
 
         parameters
         ----------
-        content: bytes
-            The message payload
+        content: Any
+            The message payload, whose type should respect the schema defined 
in
+            `Client.create_producer`.
 
         Returns
         -------
@@ -93,7 +104,7 @@ class Producer:
         PulsarException
         """
         builder = _pulsar.MessageBuilder()
-        builder.content(content)
+        builder.content(self._schema.encode(content))
         future = asyncio.get_running_loop().create_future()
         self._producer.send_async(builder.build(), 
functools.partial(_set_future, future))
         msg_id = await future
@@ -116,6 +127,177 @@ class Producer:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+class Consumer:
+    """
+    The Pulsar message consumer, used to subscribe to messages from a topic.
+    """
+
+    def __init__(self, consumer: _pulsar.Consumer, schema: 
pulsar.schema.Schema) -> None:
+        """
+        Create the consumer.
+        Users should not call this constructor directly. Instead, create the
+        consumer via `Client.subscribe`.
+
+        Parameters
+        ----------
+        consumer: _pulsar.Consumer
+            The underlying Consumer object from the C extension.
+        schema: pulsar.schema.Schema
+            The schema of the data that will be received by this consumer.
+        """
+        self._consumer = consumer
+        self._schema = schema
+
+    async def receive(self) -> pulsar.Message:
+        """
+        Receive a single message asynchronously.
+
+        Returns
+        -------
+        pulsar.Message
+            The message received.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.receive_async(functools.partial(_set_future, future))
+        msg = await future
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
+
+    async def acknowledge(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of a single message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_async(msg, functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def acknowledge_cumulative(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of all the messages in the stream up to (and
+        including) the provided message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_cumulative_async(
+            msg, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def unsubscribe(self) -> None:
+        """
+        Unsubscribe the current consumer from the topic asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.unsubscribe_async(functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+        """
+        Reset the subscription associated with this consumer to a specific
+        message id or publish timestamp asynchronously.
+
+        The message id can either be a specific message or represent the first
+        or last messages in the topic.
+
+        Parameters
+        ----------
+        messageid : MessageId or int
+            The message id for seek, OR an integer event time (timestamp) to
+            seek to
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(messageid, pulsar.MessageId):
+            msg_id = messageid._msg_id
+        elif isinstance(messageid, int):
+            msg_id = messageid
+        else:
+            raise ValueError(f"invalid messageid type {type(messageid)}")
+        self._consumer.seek_async(
+            msg_id, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def close(self) -> None:
+        """
+        Close the consumer asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+    def topic(self) -> str:
+        """
+        Return the topic this consumer is subscribed to.
+        """
+        return self._consumer.topic()
+
+    def subscription_name(self) -> str:
+        """
+        Return the subscription name.
+        """
+        return self._consumer.subscription_name()
+
+    def consumer_name(self) -> str:
+        """
+        Return the consumer name.
+        """
+        return self._consumer.consumer_name()
+
 class Client:
     """
     The asynchronous version of `pulsar.Client`.
@@ -127,7 +309,10 @@ class Client:
         """
         self._client: _pulsar.Client = pulsar.Client(service_url, 
**kwargs)._client
 
-    async def create_producer(self, topic: str) -> Producer:
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+    async def create_producer(self, topic: str,
+                              schema: pulsar.schema.Schema | None = None,
+                              ) -> Producer:
         """
         Create a new producer on a given topic
 
@@ -135,6 +320,8 @@ class Client:
         ----------
         topic: str
             The topic name
+        schema: pulsar.schema.Schema | None, default=None
+            Define the schema of the data that will be published by this 
producer.
 
         Returns
         -------
@@ -145,11 +332,199 @@ class Client:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+        schema.attach_client(self._client)
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
-        return Producer(await future)
+        conf.schema(schema.schema_info())
+
+        self._client.create_producer_async(
+            topic, conf, functools.partial(_set_future, future)
+        )
+        return Producer(await future, schema)
+
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
+    async def subscribe(self, topic: Union[str, List[str]],
+                        subscription_name: str,
+                        consumer_type: pulsar.ConsumerType =
+                        pulsar.ConsumerType.Exclusive,
+                        schema: pulsar.schema.Schema | None = None,
+                        receiver_queue_size: int = 1000,
+                        max_total_receiver_queue_size_across_partitions: int =
+                        50000,
+                        consumer_name: str | None = None,
+                        unacked_messages_timeout_ms: int | None = None,
+                        broker_consumer_stats_cache_time_ms: int = 30000,
+                        negative_ack_redelivery_delay_ms: int = 60000,
+                        is_read_compacted: bool = False,
+                        properties: dict | None = None,
+                        initial_position: InitialPosition = 
InitialPosition.Latest,
+                        crypto_key_reader: pulsar.CryptoKeyReader | None = 
None,
+                        replicate_subscription_state_enabled: bool = False,
+                        max_pending_chunked_message: int = 10,
+                        auto_ack_oldest_chunked_message_on_queue_full: bool = 
False,
+                        start_message_id_inclusive: bool = False,
+                        batch_receive_policy: 
pulsar.ConsumerBatchReceivePolicy | None =
+                        None,
+                        key_shared_policy: pulsar.ConsumerKeySharedPolicy | 
None =
+                        None,
+                        batch_index_ack_enabled: bool = False,
+                        regex_subscription_mode: RegexSubscriptionMode =
+                        RegexSubscriptionMode.PersistentOnly,
+                        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | 
None =
+                        None,
+                        crypto_failure_action: ConsumerCryptoFailureAction =
+                        ConsumerCryptoFailureAction.FAIL,
+                        is_pattern_topic: bool = False) -> Consumer:
+        """
+        Subscribe to the given topic and subscription combination.
+
+        Parameters
+        ----------
+        topic: str, List[str], or regex pattern
+            The name of the topic, list of topics or regex pattern.
+            When `is_pattern_topic` is True, `topic` is treated as a regex.
+        subscription_name: str
+            The name of the subscription.
+        consumer_type: pulsar.ConsumerType, 
default=pulsar.ConsumerType.Exclusive
+            Select the subscription type to be used when subscribing to the 
topic.
+        schema: pulsar.schema.Schema | None, default=None
+            Define the schema of the data that will be received by this 
consumer.
+        receiver_queue_size: int, default=1000
+            Sets the size of the consumer receive queue.
+        max_total_receiver_queue_size_across_partitions: int, default=50000
+            Set the max total receiver queue size across partitions.
+        consumer_name: str | None, default=None
+            Sets the consumer name.
+        unacked_messages_timeout_ms: int | None, default=None
+            Sets the timeout in milliseconds for unacknowledged messages.
+        broker_consumer_stats_cache_time_ms: int, default=30000
+            Sets the time duration for which the broker-side consumer stats
+            will be cached in the client.
+        negative_ack_redelivery_delay_ms: int, default=60000
+            The delay after which to redeliver the messages that failed to be
+            processed.
+        is_read_compacted: bool, default=False
+            Selects whether to read the compacted version of the topic.
+        properties: dict | None, default=None
+            Sets the properties for the consumer.
+        initial_position: InitialPosition, default=InitialPosition.Latest
+            Set the initial position of a consumer when subscribing to the 
topic.
+        crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+            Symmetric encryption class implementation.
+        replicate_subscription_state_enabled: bool, default=False
+            Set whether the subscription status should be replicated.
+        max_pending_chunked_message: int, default=10
+            Consumer buffers chunk messages into memory until it receives all 
the chunks.
+        auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
+            Automatically acknowledge oldest chunked messages on queue
+            full.
+        start_message_id_inclusive: bool, default=False
+            Set the consumer to include the given position of any reset
+            operation.
+        batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None, 
default=None
+            Set the batch collection policy for batch receiving.
+        key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None
+            Set the key shared policy for use when the ConsumerType is
+            KeyShared.
+        batch_index_ack_enabled: bool, default=False
+            Enable the batch index acknowledgement.
+        regex_subscription_mode: RegexSubscriptionMode,
+            default=RegexSubscriptionMode.PersistentOnly
+            Set the regex subscription mode for use when the topic is a regex
+            pattern.
+        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None, 
default=None
+            Set dead letter policy for consumer.
+        crypto_failure_action: ConsumerCryptoFailureAction,
+            default=ConsumerCryptoFailureAction.FAIL
+            Set the behavior when the decryption fails.
+        is_pattern_topic: bool, default=False
+            Whether `topic` is a regex pattern. If it's True when `topic` is a 
list, a ValueError
+            will be raised.
+
+        Returns
+        -------
+        Consumer
+            The consumer created
+
+        Raises
+        ------
+        PulsarException
+        """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
+        future = asyncio.get_running_loop().create_future()
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.regex_subscription_mode(regex_subscription_mode)
+        conf.read_compacted(is_read_compacted)
+        conf.receiver_queue_size(receiver_queue_size)
+        conf.max_total_receiver_queue_size_across_partitions(
+            max_total_receiver_queue_size_across_partitions
+        )
+        if consumer_name:
+            conf.consumer_name(consumer_name)
+        if unacked_messages_timeout_ms:
+            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+
+        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
+        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+        conf.subscription_initial_position(initial_position)
+
+        conf.schema(schema.schema_info())
+
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
+        conf.max_pending_chunked_message(max_pending_chunked_message)
+        conf.auto_ack_oldest_chunked_message_on_queue_full(
+            auto_ack_oldest_chunked_message_on_queue_full
+        )
+        conf.start_message_id_inclusive(start_message_id_inclusive)
+        if batch_receive_policy:
+            conf.batch_receive_policy(batch_receive_policy.policy())
+
+        if key_shared_policy:
+            conf.key_shared_policy(key_shared_policy.policy())
+        conf.batch_index_ack_enabled(batch_index_ack_enabled)
+        if dead_letter_policy:
+            conf.dead_letter_policy(dead_letter_policy.policy())
+        conf.crypto_failure_action(crypto_failure_action)
+
+        if isinstance(topic, str):
+            if is_pattern_topic:
+                self._client.subscribe_async_pattern(
+                    topic, subscription_name, conf,
+                    functools.partial(_set_future, future)
+                )
+            else:
+                self._client.subscribe_async(
+                    topic, subscription_name, conf,
+                    functools.partial(_set_future, future)
+                )
+        elif isinstance(topic, list):
+            if is_pattern_topic:
+                raise ValueError(
+                    "Argument 'topic' must be a string when "
+                    "'is_pattern_topic' is True; lists of topics do not "
+                    "support pattern subscriptions"
+                )
+            self._client.subscribe_async_topics(
+                topic, subscription_name, conf,
+                functools.partial(_set_future, future)
+            )
+        else:
+            raise ValueError( "Argument 'topic' is expected to be of type 
'str' or 'list'")
+
+        schema.attach_client(self._client)
+        return Consumer(await future, schema)
 
     async def close(self) -> None:
         """
@@ -160,7 +535,9 @@ class Client:
         PulsarException
         """
         future = asyncio.get_running_loop().create_future()
-        self._client.close_async(functools.partial(_set_future, future, 
value=None))
+        self._client.close_async(
+            functools.partial(_set_future, future, value=None)
+        )
         await future
 
 def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
diff --git a/src/client.cc b/src/client.cc
index 72c824f..64056df 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -80,6 +80,26 @@ void Client_closeAsync(Client& client, ResultCallback 
callback) {
     client.closeAsync(callback);
 }
 
+void Client_subscribeAsync(Client& client, const std::string& topic, const 
std::string& subscriptionName,
+                           ConsumerConfiguration conf, SubscribeCallback 
callback) {
+    py::gil_scoped_release release;
+    client.subscribeAsync(topic, subscriptionName, conf, callback);
+}
+
+void Client_subscribeAsync_topics(Client& client, const 
std::vector<std::string>& topics,
+                                  const std::string& subscriptionName, 
ConsumerConfiguration conf,
+                                  SubscribeCallback callback) {
+    py::gil_scoped_release release;
+    client.subscribeAsync(topics, subscriptionName, conf, callback);
+}
+
+void Client_subscribeAsync_pattern(Client& client, const std::string& 
topic_pattern,
+                                   const std::string& subscriptionName, 
ConsumerConfiguration conf,
+                                   SubscribeCallback callback) {
+    py::gil_scoped_release release;
+    client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, 
callback);
+}
+
 void export_client(py::module_& m) {
     py::class_<Client, std::shared_ptr<Client>>(m, "Client")
         .def(py::init<const std::string&, const ClientConfiguration&>())
@@ -99,5 +119,8 @@ void export_client(py::module_& m) {
         .def("get_schema_info", &Client_getSchemaInfo)
         .def("close", &Client_close)
         .def("close_async", &Client_closeAsync)
+        .def("subscribe_async", &Client_subscribeAsync)
+        .def("subscribe_async_topics", &Client_subscribeAsync_topics)
+        .def("subscribe_async_pattern", &Client_subscribeAsync_pattern)
         .def("shutdown", &Client::shutdown);
 }
diff --git a/src/consumer.cc b/src/consumer.cc
index e32a865..f1d7367 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -19,6 +19,7 @@
 #include "utils.h"
 
 #include <pulsar/Consumer.h>
+#include <pybind11/functional.h>
 #include <pybind11/pybind11.h>
 #include <pybind11/stl.h>
 
@@ -100,12 +101,58 @@ MessageId Consumer_get_last_message_id(Consumer& 
consumer) {
     MessageId msgId;
     Result res;
     Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
-    Py_END_ALLOW_THREADS
-
-        CHECK_RESULT(res);
+    Py_END_ALLOW_THREADS;
+    CHECK_RESULT(res);
     return msgId;
 }
 
+void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
+    py::gil_scoped_release release;
+    consumer.receiveAsync(callback);
+}
+
+void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, 
ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.acknowledgeAsync(msg, callback);
+}
+
+void Consumer_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& 
msgId,
+                                          ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.acknowledgeAsync(msgId, callback);
+}
+
+void Consumer_acknowledgeCumulativeAsync(Consumer& consumer, const Message& 
msg, ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.acknowledgeCumulativeAsync(msg, callback);
+}
+
+void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const 
MessageId& msgId,
+                                                    ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.acknowledgeCumulativeAsync(msgId, callback);
+}
+
+void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.closeAsync(callback);
+}
+
+void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.unsubscribeAsync(callback);
+}
+
+void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId, 
ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.seekAsync(msgId, callback);
+}
+
+void Consumer_seekAsync_timestamp(Consumer& consumer, uint64_t timestamp, 
ResultCallback callback) {
+    py::gil_scoped_release release;
+    consumer.seekAsync(timestamp, callback);
+}
+
 void export_consumer(py::module_& m) {
     py::class_<Consumer>(m, "Consumer")
         .def(py::init<>())
@@ -130,5 +177,14 @@ void export_consumer(py::module_& m) {
         .def("seek", &Consumer_seek)
         .def("seek", &Consumer_seek_timestamp)
         .def("is_connected", &Consumer_is_connected)
-        .def("get_last_message_id", &Consumer_get_last_message_id);
+        .def("get_last_message_id", &Consumer_get_last_message_id)
+        .def("receive_async", &Consumer_receiveAsync)
+        .def("acknowledge_async", &Consumer_acknowledgeAsync)
+        .def("acknowledge_async", &Consumer_acknowledgeAsync_message_id)
+        .def("acknowledge_cumulative_async", 
&Consumer_acknowledgeCumulativeAsync)
+        .def("acknowledge_cumulative_async", 
&Consumer_acknowledgeCumulativeAsync_message_id)
+        .def("close_async", &Consumer_closeAsync)
+        .def("unsubscribe_async", &Consumer_unsubscribeAsync)
+        .def("seek_async", &Consumer_seekAsync)
+        .def("seek_async", &Consumer_seekAsync_timestamp);
 }
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index fe6877f..656ffba 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -18,30 +18,49 @@
 # under the License.
 #
 
+"""
+Unit tests for asyncio Pulsar client API.
+"""
+
+# pylint: disable=missing-function-docstring
+
 import asyncio
-import pulsar
-from pulsar.asyncio import (
-    Client,
-    PulsarException,
-)
+import time
+from typing import List
 from unittest import (
     main,
     IsolatedAsyncioTestCase,
 )
 
-service_url = 'pulsar://localhost:6650'
+import pulsar  # pylint: disable=import-error
+from pulsar.asyncio import (  # pylint: disable=import-error
+    Client,
+    Consumer,
+    Producer,
+    PulsarException,
+)
+from pulsar.schema import (  # pylint: disable=import-error
+    AvroSchema,
+    Integer,
+    Record,
+    String,
+)
+
+SERVICE_URL = 'pulsar://localhost:6650'
 
 class AsyncioTest(IsolatedAsyncioTestCase):
+    """Test cases for asyncio Pulsar client."""
 
     async def asyncSetUp(self) -> None:
-        self._client = Client(service_url,
+        self._client = Client(SERVICE_URL,
                               operation_timeout_seconds=5)
 
     async def asyncTearDown(self) -> None:
         await self._client.close()
 
-    async def test_batch_send(self):
-        producer = await 
self._client.create_producer('awaitio-test-batch-send')
+    async def test_batch_end_to_end(self):
+        topic = f'asyncio-test-batch-e2e-{time.time()}'
+        producer = await self._client.create_producer(topic)
         tasks = []
         for i in range(5):
             
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
@@ -58,15 +77,28 @@ class AsyncioTest(IsolatedAsyncioTestCase):
             self.assertEqual(msg_ids[i].entry_id(), entry_id)
             self.assertEqual(msg_ids[i].batch_index(), i)
 
+        consumer = await self._client.subscribe(topic, 'sub',
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+        await consumer.close()
+
+        # create a different subscription to verify initial position is latest 
by default
+        consumer = await self._client.subscribe(topic, 'sub2')
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
     async def test_create_producer_failure(self):
         try:
-            await 
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+            await 
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.Timeout)
 
     async def test_send_failure(self):
-        producer = await 
self._client.create_producer('awaitio-test-send-failure')
+        producer = await 
self._client.create_producer('asyncio-test-send-failure')
         try:
             await producer.send(('x' * 1024 * 1024 * 10).encode())
             self.fail()
@@ -74,7 +106,7 @@ class AsyncioTest(IsolatedAsyncioTestCase):
             self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
 
     async def test_close_producer(self):
-        producer = await 
self._client.create_producer('awaitio-test-close-producer')
+        producer = await 
self._client.create_producer('asyncio-test-close-producer')
         await producer.close()
         try:
             await producer.close()
@@ -82,5 +114,169 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
 
+    async def _prepare_messages(self, producer: Producer) -> 
List[pulsar.MessageId]:
+        msg_ids = []
+        for i in range(5):
+            msg_ids.append(await producer.send(f'msg-{i}'.encode()))
+        return msg_ids
+
+    async def test_consumer_cumulative_acknowledge(self):
+        topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        last_msg = None
+        for _ in range(5):
+            last_msg = await consumer.receive()
+        await consumer.acknowledge_cumulative(last_msg)
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub)
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
+    async def test_consumer_individual_acknowledge(self):
+        topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub,
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        msgs = []
+        for _ in range(5):
+            msg = await consumer.receive()
+            msgs.append(msg)
+
+        await consumer.acknowledge(msgs[0])
+        await consumer.acknowledge(msgs[2])
+        await consumer.acknowledge(msgs[4])
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub,
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-1')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')
+
+    async def test_multi_topic_consumer(self):
+        topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+        producers = []
+
+        for topic in topics:
+            producer = await self._client.create_producer(topic)
+            producers.append(producer)
+
+        consumer = await self._client.subscribe(topics, 
'test-multi-subscription')
+
+        await producers[0].send(b'message-from-topic-1')
+        await producers[1].send(b'message-from-topic-2')
+
+        async def verify_receive(consumer: Consumer):
+            received_messages = {}
+            for _ in range(2):
+                msg = await consumer.receive()
+                received_messages[msg.data()] = None
+                await consumer.acknowledge(msg.message_id())
+            self.assertEqual(received_messages, {
+                b'message-from-topic-1': None,
+                b'message-from-topic-2': None
+            })
+
+        await verify_receive(consumer)
+        await consumer.close()
+
+        consumer = await 
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+                                                'test-multi-subscription-2',
+                                                is_pattern_topic=True,
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        await verify_receive(consumer)
+        await consumer.close()
+
+    async def test_unsubscribe(self):
+        topic = f'asyncio-test-unsubscribe-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        await consumer.unsubscribe()
+        # Verify the consumer can be created successfully with the same 
subscription name
+        consumer = await self._client.subscribe(topic, sub)
+        await consumer.close()
+
+    async def test_seek_message_id(self):
+        topic = f'asyncio-test-seek-message-id-{time.time()}'
+        sub = 'sub'
+
+        producer = await self._client.create_producer(topic)
+        msg_ids = await self._prepare_messages(producer)
+
+        consumer = await self._client.subscribe(
+            topic, sub, initial_position=pulsar.InitialPosition.Earliest
+        )
+        await consumer.seek(msg_ids[2])
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')
+        await consumer.close()
+
+        consumer = await self._client.subscribe(
+            topic, sub, initial_position=pulsar.InitialPosition.Earliest,
+            start_message_id_inclusive=True
+        )
+        await consumer.seek(msg_ids[2])
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-2')
+        await consumer.close()
+
+    async def test_seek_timestamp(self):
+        topic = f'asyncio-test-seek-timestamp-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(
+            topic, sub, initial_position=pulsar.InitialPosition.Earliest
+        )
+
+        producer = await self._client.create_producer(topic)
+
+        # Send first 3 messages
+        for i in range(3):
+            await producer.send(f'msg-{i}'.encode())
+
+        seek_time = int(time.time() * 1000)
+
+        # Send 2 more messages
+        for i in range(3, 5):
+            await producer.send(f'msg-{i}'.encode())
+
+        # Consume all messages first
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+
+        # Seek to the timestamp (should start from msg-3)
+        await consumer.seek(seek_time)
+
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')
+
+    async def test_schema(self):
+        class ExampleRecord(Record):  # pylint: disable=too-few-public-methods
+            """Example record schema for testing."""
+            str_field = String()
+            int_field = Integer()
+
+        topic = f'asyncio-test-schema-{time.time()}'
+        producer = await self._client.create_producer(
+                topic, schema=AvroSchema(ExampleRecord)
+        )
+        consumer = await self._client.subscribe(
+            topic, 'sub', schema=AvroSchema(ExampleRecord)
+        )
+        await producer.send(ExampleRecord(str_field='test', int_field=42))
+        msg = await consumer.receive()
+        self.assertIsInstance(msg.value(), ExampleRecord)
+        self.assertEqual(msg.value().str_field, 'test')
+        self.assertEqual(msg.value().int_field, 42)
+
+
 if __name__ == '__main__':
     main()


Reply via email to