This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 961411f Implement async consumer (#277)
961411f is described below
commit 961411f896564e0ce0c40f63aa09cb2917b77587
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 24 16:43:20 2025 +0800
Implement async consumer (#277)
---
pulsar/asyncio.py | 401 ++++++++++++++++++++++++++++++++++++++++++++++++--
src/client.cc | 23 +++
src/consumer.cc | 64 +++++++-
tests/asyncio_test.py | 220 +++++++++++++++++++++++++--
4 files changed, 680 insertions(+), 28 deletions(-)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 445d477..01246c6 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -17,15 +17,22 @@
# under the License.
#
+# pylint: disable=no-name-in-module,c-extension-no-member,protected-access
+
"""
The Pulsar Python client APIs that work with the asyncio module.
"""
import asyncio
import functools
-from typing import Any
+from typing import Any, List, Union
import _pulsar
+from _pulsar import (
+ InitialPosition,
+ RegexSubscriptionMode,
+ ConsumerCryptoFailureAction,
+)
import pulsar
class PulsarException(BaseException):
@@ -61,7 +68,7 @@ class Producer:
The Pulsar message producer, used to publish messages on a topic.
"""
- def __init__(self, producer: _pulsar.Producer) -> None:
+ def __init__(self, producer: _pulsar.Producer, schema:
pulsar.schema.Schema) -> None:
"""
Create the producer.
Users should not call this constructor directly. Instead, create the
@@ -71,17 +78,21 @@ class Producer:
----------
producer: _pulsar.Producer
The underlying Producer object from the C extension.
+ schema: pulsar.schema.Schema
+ The schema of the data that will be sent by this producer.
"""
- self._producer: _pulsar.Producer = producer
+ self._producer = producer
+ self._schema = schema
- async def send(self, content: bytes) -> pulsar.MessageId:
+ async def send(self, content: Any) -> pulsar.MessageId:
"""
Send a message asynchronously.
parameters
----------
- content: bytes
- The message payload
+ content: Any
+ The message payload, whose type should respect the schema defined
in
+ `Client.create_producer`.
Returns
-------
@@ -93,7 +104,7 @@ class Producer:
PulsarException
"""
builder = _pulsar.MessageBuilder()
- builder.content(content)
+ builder.content(self._schema.encode(content))
future = asyncio.get_running_loop().create_future()
self._producer.send_async(builder.build(),
functools.partial(_set_future, future))
msg_id = await future
@@ -116,6 +127,177 @@ class Producer:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
+class Consumer:
+ """
+ The Pulsar message consumer, used to subscribe to messages from a topic.
+ """
+
+ def __init__(self, consumer: _pulsar.Consumer, schema:
pulsar.schema.Schema) -> None:
+ """
+ Create the consumer.
+ Users should not call this constructor directly. Instead, create the
+ consumer via `Client.subscribe`.
+
+ Parameters
+ ----------
+ consumer: _pulsar.Consumer
+ The underlying Consumer object from the C extension.
+ schema: pulsar.schema.Schema
+ The schema of the data that will be received by this consumer.
+ """
+ self._consumer = consumer
+ self._schema = schema
+
+ async def receive(self) -> pulsar.Message:
+ """
+ Receive a single message asynchronously.
+
+ Returns
+ -------
+ pulsar.Message
+ The message received.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.receive_async(functools.partial(_set_future, future))
+ msg = await future
+ m = pulsar.Message()
+ m._message = msg
+ m._schema = self._schema
+ return m
+
+ async def acknowledge(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of a single message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_async(msg, functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def acknowledge_cumulative(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of all the messages in the stream up to (and
+ including) the provided message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_cumulative_async(
+ msg, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def unsubscribe(self) -> None:
+ """
+ Unsubscribe the current consumer from the topic asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.unsubscribe_async(functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+ """
+ Reset the subscription associated with this consumer to a specific
+ message id or publish timestamp asynchronously.
+
+ The message id can either be a specific message or represent the first
+ or last messages in the topic.
+
+ Parameters
+ ----------
+ messageid : MessageId or int
+ The message id for seek, OR an integer event time (timestamp) to
+ seek to
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(messageid, pulsar.MessageId):
+ msg_id = messageid._msg_id
+ elif isinstance(messageid, int):
+ msg_id = messageid
+ else:
+ raise ValueError(f"invalid messageid type {type(messageid)}")
+ self._consumer.seek_async(
+ msg_id, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def close(self) -> None:
+ """
+ Close the consumer asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.close_async(functools.partial(_set_future, future,
value=None))
+ await future
+
+ def topic(self) -> str:
+ """
+ Return the topic this consumer is subscribed to.
+ """
+ return self._consumer.topic()
+
+ def subscription_name(self) -> str:
+ """
+ Return the subscription name.
+ """
+ return self._consumer.subscription_name()
+
+ def consumer_name(self) -> str:
+ """
+ Return the consumer name.
+ """
+ return self._consumer.consumer_name()
+
class Client:
"""
The asynchronous version of `pulsar.Client`.
@@ -127,7 +309,10 @@ class Client:
"""
self._client: _pulsar.Client = pulsar.Client(service_url,
**kwargs)._client
- async def create_producer(self, topic: str) -> Producer:
+ # pylint:
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+ async def create_producer(self, topic: str,
+ schema: pulsar.schema.Schema | None = None,
+ ) -> Producer:
"""
Create a new producer on a given topic
@@ -135,6 +320,8 @@ class Client:
----------
topic: str
The topic name
+ schema: pulsar.schema.Schema | None, default=None
+ Define the schema of the data that will be published by this
producer.
Returns
-------
@@ -145,11 +332,199 @@ class Client:
------
PulsarException
"""
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+ schema.attach_client(self._client)
+
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
- # TODO: add more configs
- self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
- return Producer(await future)
+ conf.schema(schema.schema_info())
+
+ self._client.create_producer_async(
+ topic, conf, functools.partial(_set_future, future)
+ )
+ return Producer(await future, schema)
+
+ # pylint:
disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
+ async def subscribe(self, topic: Union[str, List[str]],
+ subscription_name: str,
+ consumer_type: pulsar.ConsumerType =
+ pulsar.ConsumerType.Exclusive,
+ schema: pulsar.schema.Schema | None = None,
+ receiver_queue_size: int = 1000,
+ max_total_receiver_queue_size_across_partitions: int =
+ 50000,
+ consumer_name: str | None = None,
+ unacked_messages_timeout_ms: int | None = None,
+ broker_consumer_stats_cache_time_ms: int = 30000,
+ negative_ack_redelivery_delay_ms: int = 60000,
+ is_read_compacted: bool = False,
+ properties: dict | None = None,
+ initial_position: InitialPosition =
InitialPosition.Latest,
+ crypto_key_reader: pulsar.CryptoKeyReader | None =
None,
+ replicate_subscription_state_enabled: bool = False,
+ max_pending_chunked_message: int = 10,
+ auto_ack_oldest_chunked_message_on_queue_full: bool =
False,
+ start_message_id_inclusive: bool = False,
+ batch_receive_policy:
pulsar.ConsumerBatchReceivePolicy | None =
+ None,
+ key_shared_policy: pulsar.ConsumerKeySharedPolicy |
None =
+ None,
+ batch_index_ack_enabled: bool = False,
+ regex_subscription_mode: RegexSubscriptionMode =
+ RegexSubscriptionMode.PersistentOnly,
+ dead_letter_policy: pulsar.ConsumerDeadLetterPolicy |
None =
+ None,
+ crypto_failure_action: ConsumerCryptoFailureAction =
+ ConsumerCryptoFailureAction.FAIL,
+ is_pattern_topic: bool = False) -> Consumer:
+ """
+ Subscribe to the given topic and subscription combination.
+
+ Parameters
+ ----------
+ topic: str, List[str], or regex pattern
+ The name of the topic, list of topics or regex pattern.
+ When `is_pattern_topic` is True, `topic` is treated as a regex.
+ subscription_name: str
+ The name of the subscription.
+ consumer_type: pulsar.ConsumerType,
default=pulsar.ConsumerType.Exclusive
+ Select the subscription type to be used when subscribing to the
topic.
+ schema: pulsar.schema.Schema | None, default=None
+ Define the schema of the data that will be received by this
consumer.
+ receiver_queue_size: int, default=1000
+ Sets the size of the consumer receive queue.
+ max_total_receiver_queue_size_across_partitions: int, default=50000
+ Set the max total receiver queue size across partitions.
+ consumer_name: str | None, default=None
+ Sets the consumer name.
+ unacked_messages_timeout_ms: int | None, default=None
+ Sets the timeout in milliseconds for unacknowledged messages.
+ broker_consumer_stats_cache_time_ms: int, default=30000
+ Sets the time duration for which the broker-side consumer stats
+ will be cached in the client.
+ negative_ack_redelivery_delay_ms: int, default=60000
+ The delay after which to redeliver the messages that failed to be
+ processed.
+ is_read_compacted: bool, default=False
+ Selects whether to read the compacted version of the topic.
+ properties: dict | None, default=None
+ Sets the properties for the consumer.
+ initial_position: InitialPosition, default=InitialPosition.Latest
+ Set the initial position of a consumer when subscribing to the
topic.
+ crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+ Symmetric encryption class implementation.
+ replicate_subscription_state_enabled: bool, default=False
+ Set whether the subscription status should be replicated.
+ max_pending_chunked_message: int, default=10
+ Consumer buffers chunk messages into memory until it receives all
the chunks.
+ auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
+ Automatically acknowledge oldest chunked messages on queue
+ full.
+ start_message_id_inclusive: bool, default=False
+ Set the consumer to include the given position of any reset
+ operation.
+ batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None,
default=None
+ Set the batch collection policy for batch receiving.
+ key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None
+ Set the key shared policy for use when the ConsumerType is
+ KeyShared.
+ batch_index_ack_enabled: bool, default=False
+ Enable the batch index acknowledgement.
+ regex_subscription_mode: RegexSubscriptionMode,
+ default=RegexSubscriptionMode.PersistentOnly
+ Set the regex subscription mode for use when the topic is a regex
+ pattern.
+ dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None,
default=None
+ Set dead letter policy for consumer.
+ crypto_failure_action: ConsumerCryptoFailureAction,
+ default=ConsumerCryptoFailureAction.FAIL
+ Set the behavior when the decryption fails.
+ is_pattern_topic: bool, default=False
+ Whether `topic` is a regex pattern. If it's True when `topic` is a
list, a ValueError
+ will be raised.
+
+ Returns
+ -------
+ Consumer
+ The consumer created
+
+ Raises
+ ------
+ PulsarException
+ """
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
+ future = asyncio.get_running_loop().create_future()
+ conf = _pulsar.ConsumerConfiguration()
+ conf.consumer_type(consumer_type)
+ conf.regex_subscription_mode(regex_subscription_mode)
+ conf.read_compacted(is_read_compacted)
+ conf.receiver_queue_size(receiver_queue_size)
+ conf.max_total_receiver_queue_size_across_partitions(
+ max_total_receiver_queue_size_across_partitions
+ )
+ if consumer_name:
+ conf.consumer_name(consumer_name)
+ if unacked_messages_timeout_ms:
+ conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+
+ conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
+
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+ conf.subscription_initial_position(initial_position)
+
+ conf.schema(schema.schema_info())
+
+ if crypto_key_reader:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
+ conf.max_pending_chunked_message(max_pending_chunked_message)
+ conf.auto_ack_oldest_chunked_message_on_queue_full(
+ auto_ack_oldest_chunked_message_on_queue_full
+ )
+ conf.start_message_id_inclusive(start_message_id_inclusive)
+ if batch_receive_policy:
+ conf.batch_receive_policy(batch_receive_policy.policy())
+
+ if key_shared_policy:
+ conf.key_shared_policy(key_shared_policy.policy())
+ conf.batch_index_ack_enabled(batch_index_ack_enabled)
+ if dead_letter_policy:
+ conf.dead_letter_policy(dead_letter_policy.policy())
+ conf.crypto_failure_action(crypto_failure_action)
+
+ if isinstance(topic, str):
+ if is_pattern_topic:
+ self._client.subscribe_async_pattern(
+ topic, subscription_name, conf,
+ functools.partial(_set_future, future)
+ )
+ else:
+ self._client.subscribe_async(
+ topic, subscription_name, conf,
+ functools.partial(_set_future, future)
+ )
+ elif isinstance(topic, list):
+ if is_pattern_topic:
+ raise ValueError(
+ "Argument 'topic' must be a string when "
+ "'is_pattern_topic' is True; lists of topics do not "
+ "support pattern subscriptions"
+ )
+ self._client.subscribe_async_topics(
+ topic, subscription_name, conf,
+ functools.partial(_set_future, future)
+ )
+ else:
+ raise ValueError( "Argument 'topic' is expected to be of type
'str' or 'list'")
+
+ schema.attach_client(self._client)
+ return Consumer(await future, schema)
async def close(self) -> None:
"""
@@ -160,7 +535,9 @@ class Client:
PulsarException
"""
future = asyncio.get_running_loop().create_future()
- self._client.close_async(functools.partial(_set_future, future,
value=None))
+ self._client.close_async(
+ functools.partial(_set_future, future, value=None)
+ )
await future
def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any):
diff --git a/src/client.cc b/src/client.cc
index 72c824f..64056df 100644
--- a/src/client.cc
+++ b/src/client.cc
@@ -80,6 +80,26 @@ void Client_closeAsync(Client& client, ResultCallback
callback) {
client.closeAsync(callback);
}
+void Client_subscribeAsync(Client& client, const std::string& topic, const
std::string& subscriptionName,
+ ConsumerConfiguration conf, SubscribeCallback
callback) {
+ py::gil_scoped_release release;
+ client.subscribeAsync(topic, subscriptionName, conf, callback);
+}
+
+void Client_subscribeAsync_topics(Client& client, const
std::vector<std::string>& topics,
+ const std::string& subscriptionName,
ConsumerConfiguration conf,
+ SubscribeCallback callback) {
+ py::gil_scoped_release release;
+ client.subscribeAsync(topics, subscriptionName, conf, callback);
+}
+
+void Client_subscribeAsync_pattern(Client& client, const std::string&
topic_pattern,
+ const std::string& subscriptionName,
ConsumerConfiguration conf,
+ SubscribeCallback callback) {
+ py::gil_scoped_release release;
+ client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf,
callback);
+}
+
void export_client(py::module_& m) {
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
.def(py::init<const std::string&, const ClientConfiguration&>())
@@ -99,5 +119,8 @@ void export_client(py::module_& m) {
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
.def("close_async", &Client_closeAsync)
+ .def("subscribe_async", &Client_subscribeAsync)
+ .def("subscribe_async_topics", &Client_subscribeAsync_topics)
+ .def("subscribe_async_pattern", &Client_subscribeAsync_pattern)
.def("shutdown", &Client::shutdown);
}
diff --git a/src/consumer.cc b/src/consumer.cc
index e32a865..f1d7367 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -19,6 +19,7 @@
#include "utils.h"
#include <pulsar/Consumer.h>
+#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
@@ -100,12 +101,58 @@ MessageId Consumer_get_last_message_id(Consumer&
consumer) {
MessageId msgId;
Result res;
Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
- Py_END_ALLOW_THREADS
-
- CHECK_RESULT(res);
+ Py_END_ALLOW_THREADS;
+ CHECK_RESULT(res);
return msgId;
}
+void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
+ py::gil_scoped_release release;
+ consumer.receiveAsync(callback);
+}
+
+void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg,
ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.acknowledgeAsync(msg, callback);
+}
+
+void Consumer_acknowledgeAsync_message_id(Consumer& consumer, const MessageId&
msgId,
+ ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.acknowledgeAsync(msgId, callback);
+}
+
+void Consumer_acknowledgeCumulativeAsync(Consumer& consumer, const Message&
msg, ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.acknowledgeCumulativeAsync(msg, callback);
+}
+
+void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const
MessageId& msgId,
+ ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.acknowledgeCumulativeAsync(msgId, callback);
+}
+
+void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.closeAsync(callback);
+}
+
+void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.unsubscribeAsync(callback);
+}
+
+void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId,
ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.seekAsync(msgId, callback);
+}
+
+void Consumer_seekAsync_timestamp(Consumer& consumer, uint64_t timestamp,
ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.seekAsync(timestamp, callback);
+}
+
void export_consumer(py::module_& m) {
py::class_<Consumer>(m, "Consumer")
.def(py::init<>())
@@ -130,5 +177,14 @@ void export_consumer(py::module_& m) {
.def("seek", &Consumer_seek)
.def("seek", &Consumer_seek_timestamp)
.def("is_connected", &Consumer_is_connected)
- .def("get_last_message_id", &Consumer_get_last_message_id);
+ .def("get_last_message_id", &Consumer_get_last_message_id)
+ .def("receive_async", &Consumer_receiveAsync)
+ .def("acknowledge_async", &Consumer_acknowledgeAsync)
+ .def("acknowledge_async", &Consumer_acknowledgeAsync_message_id)
+ .def("acknowledge_cumulative_async",
&Consumer_acknowledgeCumulativeAsync)
+ .def("acknowledge_cumulative_async",
&Consumer_acknowledgeCumulativeAsync_message_id)
+ .def("close_async", &Consumer_closeAsync)
+ .def("unsubscribe_async", &Consumer_unsubscribeAsync)
+ .def("seek_async", &Consumer_seekAsync)
+ .def("seek_async", &Consumer_seekAsync_timestamp);
}
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index fe6877f..656ffba 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -18,30 +18,49 @@
# under the License.
#
+"""
+Unit tests for asyncio Pulsar client API.
+"""
+
+# pylint: disable=missing-function-docstring
+
import asyncio
-import pulsar
-from pulsar.asyncio import (
- Client,
- PulsarException,
-)
+import time
+from typing import List
from unittest import (
main,
IsolatedAsyncioTestCase,
)
-service_url = 'pulsar://localhost:6650'
+import pulsar # pylint: disable=import-error
+from pulsar.asyncio import ( # pylint: disable=import-error
+ Client,
+ Consumer,
+ Producer,
+ PulsarException,
+)
+from pulsar.schema import ( # pylint: disable=import-error
+ AvroSchema,
+ Integer,
+ Record,
+ String,
+)
+
+SERVICE_URL = 'pulsar://localhost:6650'
class AsyncioTest(IsolatedAsyncioTestCase):
+ """Test cases for asyncio Pulsar client."""
async def asyncSetUp(self) -> None:
- self._client = Client(service_url,
+ self._client = Client(SERVICE_URL,
operation_timeout_seconds=5)
async def asyncTearDown(self) -> None:
await self._client.close()
- async def test_batch_send(self):
- producer = await
self._client.create_producer('awaitio-test-batch-send')
+ async def test_batch_end_to_end(self):
+ topic = f'asyncio-test-batch-e2e-{time.time()}'
+ producer = await self._client.create_producer(topic)
tasks = []
for i in range(5):
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
@@ -58,15 +77,28 @@ class AsyncioTest(IsolatedAsyncioTestCase):
self.assertEqual(msg_ids[i].entry_id(), entry_id)
self.assertEqual(msg_ids[i].batch_index(), i)
+ consumer = await self._client.subscribe(topic, 'sub',
+
initial_position=pulsar.InitialPosition.Earliest)
+ for i in range(5):
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), f'msg-{i}'.encode())
+ await consumer.close()
+
+ # create a different subscription to verify initial position is latest
by default
+ consumer = await self._client.subscribe(topic, 'sub2')
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
async def test_create_producer_failure(self):
try:
- await
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+ await
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.Timeout)
async def test_send_failure(self):
- producer = await
self._client.create_producer('awaitio-test-send-failure')
+ producer = await
self._client.create_producer('asyncio-test-send-failure')
try:
await producer.send(('x' * 1024 * 1024 * 10).encode())
self.fail()
@@ -74,7 +106,7 @@ class AsyncioTest(IsolatedAsyncioTestCase):
self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
async def test_close_producer(self):
- producer = await
self._client.create_producer('awaitio-test-close-producer')
+ producer = await
self._client.create_producer('asyncio-test-close-producer')
await producer.close()
try:
await producer.close()
@@ -82,5 +114,169 @@ class AsyncioTest(IsolatedAsyncioTestCase):
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+ async def _prepare_messages(self, producer: Producer) ->
List[pulsar.MessageId]:
+ msg_ids = []
+ for i in range(5):
+ msg_ids.append(await producer.send(f'msg-{i}'.encode()))
+ return msg_ids
+
+ async def test_consumer_cumulative_acknowledge(self):
+ topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ last_msg = None
+ for _ in range(5):
+ last_msg = await consumer.receive()
+ await consumer.acknowledge_cumulative(last_msg)
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub)
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
+ async def test_consumer_individual_acknowledge(self):
+ topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ msgs = []
+ for _ in range(5):
+ msg = await consumer.receive()
+ msgs.append(msg)
+
+ await consumer.acknowledge(msgs[0])
+ await consumer.acknowledge(msgs[2])
+ await consumer.acknowledge(msgs[4])
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-1')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-3')
+
+ async def test_multi_topic_consumer(self):
+ topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+ producers = []
+
+ for topic in topics:
+ producer = await self._client.create_producer(topic)
+ producers.append(producer)
+
+ consumer = await self._client.subscribe(topics,
'test-multi-subscription')
+
+ await producers[0].send(b'message-from-topic-1')
+ await producers[1].send(b'message-from-topic-2')
+
+ async def verify_receive(consumer: Consumer):
+ received_messages = {}
+ for _ in range(2):
+ msg = await consumer.receive()
+ received_messages[msg.data()] = None
+ await consumer.acknowledge(msg.message_id())
+ self.assertEqual(received_messages, {
+ b'message-from-topic-1': None,
+ b'message-from-topic-2': None
+ })
+
+ await verify_receive(consumer)
+ await consumer.close()
+
+ consumer = await
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+ 'test-multi-subscription-2',
+ is_pattern_topic=True,
+
initial_position=pulsar.InitialPosition.Earliest)
+ await verify_receive(consumer)
+ await consumer.close()
+
+ async def test_unsubscribe(self):
+ topic = f'asyncio-test-unsubscribe-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub)
+ await consumer.unsubscribe()
+ # Verify the consumer can be created successfully with the same
subscription name
+ consumer = await self._client.subscribe(topic, sub)
+ await consumer.close()
+
+ async def test_seek_message_id(self):
+ topic = f'asyncio-test-seek-message-id-{time.time()}'
+ sub = 'sub'
+
+ producer = await self._client.create_producer(topic)
+ msg_ids = await self._prepare_messages(producer)
+
+ consumer = await self._client.subscribe(
+ topic, sub, initial_position=pulsar.InitialPosition.Earliest
+ )
+ await consumer.seek(msg_ids[2])
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-3')
+ await consumer.close()
+
+ consumer = await self._client.subscribe(
+ topic, sub, initial_position=pulsar.InitialPosition.Earliest,
+ start_message_id_inclusive=True
+ )
+ await consumer.seek(msg_ids[2])
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-2')
+ await consumer.close()
+
+ async def test_seek_timestamp(self):
+ topic = f'asyncio-test-seek-timestamp-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(
+ topic, sub, initial_position=pulsar.InitialPosition.Earliest
+ )
+
+ producer = await self._client.create_producer(topic)
+
+ # Send first 3 messages
+ for i in range(3):
+ await producer.send(f'msg-{i}'.encode())
+
+ seek_time = int(time.time() * 1000)
+
+ # Send 2 more messages
+ for i in range(3, 5):
+ await producer.send(f'msg-{i}'.encode())
+
+ # Consume all messages first
+ for i in range(5):
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), f'msg-{i}'.encode())
+
+ # Seek to the timestamp (should start from msg-3)
+ await consumer.seek(seek_time)
+
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-3')
+
+ async def test_schema(self):
+ class ExampleRecord(Record): # pylint: disable=too-few-public-methods
+ """Example record schema for testing."""
+ str_field = String()
+ int_field = Integer()
+
+ topic = f'asyncio-test-schema-{time.time()}'
+ producer = await self._client.create_producer(
+ topic, schema=AvroSchema(ExampleRecord)
+ )
+ consumer = await self._client.subscribe(
+ topic, 'sub', schema=AvroSchema(ExampleRecord)
+ )
+ await producer.send(ExampleRecord(str_field='test', int_field=42))
+ msg = await consumer.receive()
+ self.assertIsInstance(msg.value(), ExampleRecord)
+ self.assertEqual(msg.value().str_field, 'test')
+ self.assertEqual(msg.value().int_field, 42)
+
+
if __name__ == '__main__':
main()