This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 9432cb8 Improve asynchronous producer with more options for creation
and send (#280)
9432cb8 is described below
commit 9432cb8354dedf1f402a9b698ea2d75bee83d492
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 24 22:08:36 2025 +0800
Improve asynchronous producer with more options for creation and send (#280)
---
pulsar/asyncio.py | 203 +++++++++++++++++++++++++++++++++++++++++++++++++-
src/producer.cc | 3 +-
tests/asyncio_test.py | 44 ++++++++++-
3 files changed, 246 insertions(+), 4 deletions(-)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 01246c6..5c3178a 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -25,11 +25,16 @@ The Pulsar Python client APIs that work with the asyncio
module.
import asyncio
import functools
-from typing import Any, List, Union
+from datetime import timedelta
+from typing import Any, Callable, List, Union
import _pulsar
from _pulsar import (
InitialPosition,
+ CompressionType,
+ PartitionsRoutingMode,
+ BatchingType,
+ ProducerAccessMode,
RegexSubscriptionMode,
ConsumerCryptoFailureAction,
)
@@ -84,7 +89,17 @@ class Producer:
self._producer = producer
self._schema = schema
- async def send(self, content: Any) -> pulsar.MessageId:
+ # pylint:
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+ async def send(self, content: Any,
+ properties: dict | None = None,
+ partition_key: str | None = None,
+ ordering_key: str | None = None,
+ sequence_id: int | None = None,
+ replication_clusters: List[str] | None = None,
+ disable_replication: bool | None = None,
+ event_timestamp: int | None = None,
+ deliver_at: int | None = None,
+ deliver_after: timedelta | None = None) -> pulsar.MessageId:
"""
Send a message asynchronously.
@@ -93,6 +108,28 @@ class Producer:
content: Any
The message payload, whose type should respect the schema defined
in
`Client.create_producer`.
+ properties: dict | None
+ A dict of application-defined string properties.
+ partition_key: str | None
+ Sets the partition key for the message routing. A hash of this key
is
+ used to determine the message's topic partition.
+ ordering_key: str | None
+ Sets the ordering key for the message routing.
+ sequence_id: int | None
+ Specify a custom sequence id for the message being published.
+ replication_clusters: List[str] | None
+ Override namespace replication clusters. Note that it is the
caller's responsibility
+ to provide valid cluster names and that all clusters have been
previously configured
+ as topics. Given an empty list, the message will replicate per the
namespace
+ configuration.
+ disable_replication: bool | None
+ Do not replicate this message.
+ event_timestamp: int | None
+ Timestamp in millis of the timestamp of event creation
+ deliver_at: int | None
+ Specify the message should not be delivered earlier than the
specified timestamp.
+ deliver_after: timedelta | None
+ Specify a delay in timedelta for the delivery of the messages.
Returns
-------
@@ -105,6 +142,27 @@ class Producer:
"""
builder = _pulsar.MessageBuilder()
builder.content(self._schema.encode(content))
+
+ if properties is not None:
+ for k, v in properties.items():
+ builder.property(k, v)
+ if partition_key is not None:
+ builder.partition_key(partition_key)
+ if ordering_key is not None:
+ builder.ordering_key(ordering_key)
+ if sequence_id is not None:
+ builder.sequence_id(sequence_id)
+ if replication_clusters is not None:
+ builder.replication_clusters(replication_clusters)
+ if disable_replication is not None:
+ builder.disable_replication(disable_replication)
+ if event_timestamp is not None:
+ builder.event_timestamp(event_timestamp)
+ if deliver_at is not None:
+ builder.deliver_at(deliver_at)
+ if deliver_after is not None:
+ builder.deliver_after(deliver_after)
+
future = asyncio.get_running_loop().create_future()
self._producer.send_async(builder.build(),
functools.partial(_set_future, future))
msg_id = await future
@@ -115,6 +173,18 @@ class Producer:
msg_id.batch_index(),
)
+ async def flush(self) -> None:
+ """
+ Flush all the messages buffered in the producer asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._producer.flush_async(functools.partial(_set_future, future,
value=None))
+ await future
+
async def close(self) -> None:
"""
Close the producer.
@@ -127,6 +197,30 @@ class Producer:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
+ def topic(self):
+ """
+ Return the topic which producer is publishing to
+ """
+ return self._producer.topic()
+
+ def producer_name(self):
+ """
+ Return the producer name which could have been assigned by the
+ system or specified by the client
+ """
+ return self._producer.producer_name()
+
+ def last_sequence_id(self):
+ """
+ Return the last sequence id that was published and acknowledged by
this producer.
+
+ The sequence id can be either automatically assigned or custom set on
the message.
+ After recreating a producer with the same name, this will return the
sequence id
+ of the last message that was published in the previous session, or -1
if no
+ message was ever published.
+ """
+ return self._producer.last_sequence_id()
+
class Consumer:
"""
The Pulsar message consumer, used to subscribe to messages from a topic.
@@ -311,7 +405,28 @@ class Client:
# pylint:
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
async def create_producer(self, topic: str,
+ producer_name: str | None = None,
schema: pulsar.schema.Schema | None = None,
+ initial_sequence_id: int | None = None,
+ send_timeout_millis: int = 30000,
+ compression_type: CompressionType =
CompressionType.NONE,
+ max_pending_messages: int = 1000,
+ max_pending_messages_across_partitions: int =
50000,
+ block_if_queue_full: bool = False,
+ batching_enabled: bool = True,
+ batching_max_messages: int = 1000,
+ batching_max_allowed_size_in_bytes: int =
128*1024,
+ batching_max_publish_delay_ms: int = 10,
+ chunking_enabled: bool = False,
+ message_routing_mode: PartitionsRoutingMode =
+ PartitionsRoutingMode.RoundRobinDistribution,
+ lazy_start_partitioned_producers: bool = False,
+ properties: dict | None = None,
+ batching_type: BatchingType =
BatchingType.Default,
+ encryption_key: str | None = None,
+ crypto_key_reader: pulsar.CryptoKeyReader | None
= None,
+ access_mode: ProducerAccessMode =
ProducerAccessMode.Shared,
+ message_router: Callable[[pulsar.Message, int],
int] | None = None,
) -> Producer:
"""
Create a new producer on a given topic
@@ -320,8 +435,60 @@ class Client:
----------
topic: str
The topic name
+ producer_name: str | None
+ Specify a name for the producer. If not assigned, the system will
generate a globally
+ unique name which can be accessed with `Producer.producer_name()`.
When specifying a
+ name, it is up to the user to ensure that, for a given topic, the
producer name is
+ unique across all Pulsar's clusters.
schema: pulsar.schema.Schema | None, default=None
Define the schema of the data that will be published by this
producer.
+ initial_sequence_id: int | None, default=None
+ Set the baseline for the sequence ids for messages published by
+ the producer.
+ send_timeout_millis: int, default=30000
+ If a message is not acknowledged by the server before the
+ send_timeout expires, an error will be reported.
+ compression_type: CompressionType, default=CompressionType.NONE
+ Set the compression type for the producer.
+ max_pending_messages: int, default=1000
+ Set the max size of the queue holding the messages pending to
+ receive an acknowledgment from the broker.
+ max_pending_messages_across_partitions: int, default=50000
+ Set the max size of the queue holding the messages pending to
+ receive an acknowledgment across partitions.
+ block_if_queue_full: bool, default=False
+ Set whether send operations should block when the outgoing
+ message queue is full.
+ batching_enabled: bool, default=True
+ Enable automatic message batching. Note that, unlike the
synchronous producer API in
+ ``pulsar.Client.create_producer``, batching is enabled by default
for the asyncio
+ producer.
+ batching_max_messages: int, default=1000
+ Maximum number of messages in a batch.
+ batching_max_allowed_size_in_bytes: int, default=128*1024
+ Maximum size in bytes of a batch.
+ batching_max_publish_delay_ms: int, default=10
+ The batch interval in milliseconds.
+ chunking_enabled: bool, default=False
+ Enable chunking of large messages.
+ message_routing_mode: PartitionsRoutingMode,
+ default=PartitionsRoutingMode.RoundRobinDistribution
+ Set the message routing mode for the partitioned producer.
+ lazy_start_partitioned_producers: bool, default=False
+ Start partitioned producers lazily on demand.
+ properties: dict | None, default=None
+ Sets the properties for the producer.
+ batching_type: BatchingType, default=BatchingType.Default
+ Sets the batching type for the producer.
+ encryption_key: str | None, default=None
+ The key used for symmetric encryption.
+ crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+ Symmetric encryption class implementation.
+ access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
+ Set the type of access mode that the producer requires on the
topic.
+ message_router: Callable[[pulsar.Message, int], int] | None,
default=None
+ A custom message router function that takes a Message and the
+ number of partitions and returns the partition index.
Returns
-------
@@ -332,13 +499,45 @@ class Client:
------
PulsarException
"""
+ if batching_enabled and chunking_enabled:
+ raise ValueError("Batching and chunking of messages can't be
enabled together.")
+
if schema is None:
schema = pulsar.schema.BytesSchema()
schema.attach_client(self._client)
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
+ if producer_name is not None:
+ conf.producer_name(producer_name)
conf.schema(schema.schema_info())
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.chunking_enabled(chunking_enabled)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ if properties is not None:
+ for k, v in properties.items():
+ conf.property(k, v)
+ conf.batching_type(batching_type)
+ if encryption_key is not None:
+ conf.encryption_key(encryption_key)
+ if crypto_key_reader is not None:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def underlying_router(msg: _pulsar.Message, num_partitions: int)
-> int:
+ return message_router(pulsar.Message._wrap(msg),
num_partitions)
+ conf.message_router(underlying_router)
self._client.create_producer_async(
topic, conf, functools.partial(_set_future, future)
diff --git a/src/producer.cc b/src/producer.cc
index 9b38016..6a9b1a5 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -82,5 +82,6 @@ void export_producer(py::module_& m) {
"successfully persisted\n")
.def("close", &Producer_close)
.def("close_async", &Producer_closeAsync)
- .def("is_connected", &Producer::isConnected);
+ .def("is_connected", &Producer::isConnected)
+ .def("flush_async", &Producer::flushAsync);
}
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 656ffba..048dc43 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -60,12 +60,18 @@ class AsyncioTest(IsolatedAsyncioTestCase):
async def test_batch_end_to_end(self):
topic = f'asyncio-test-batch-e2e-{time.time()}'
- producer = await self._client.create_producer(topic)
+ producer = await self._client.create_producer(topic,
+
producer_name="my-producer")
+ self.assertEqual(producer.topic(),
f'persistent://public/default/{topic}')
+ self.assertEqual(producer.producer_name(), "my-producer")
tasks = []
for i in range(5):
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
msg_ids = await asyncio.gather(*tasks)
self.assertEqual(len(msg_ids), 5)
+ # pylint: disable=fixme
+ # TODO: the result is wrong due to
https://github.com/apache/pulsar-client-cpp/issues/531
+ self.assertEqual(producer.last_sequence_id(), 8)
ledger_id = msg_ids[0].ledger_id()
entry_id = msg_ids[0].entry_id()
# These messages should be in the same entry
@@ -90,6 +96,42 @@ class AsyncioTest(IsolatedAsyncioTestCase):
msg = await consumer.receive()
self.assertEqual(msg.data(), b'final-message')
+ async def test_send_keyed_message(self):
+ topic = f'asyncio-test-send-keyed-message-{time.time()}'
+ producer = await self._client.create_producer(topic)
+ consumer = await self._client.subscribe(topic, 'sub')
+ await producer.send(b'msg', partition_key='key0',
+ ordering_key="key1",
properties={'my-prop': 'my-value'})
+
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg')
+ self.assertEqual(msg.partition_key(), 'key0')
+ self.assertEqual(msg.ordering_key(), 'key1')
+ self.assertEqual(msg.properties(), {'my-prop': 'my-value'})
+
+ async def test_flush(self):
+ topic = f'asyncio-test-flush-{time.time()}'
+ producer = await self._client.create_producer(topic,
batching_max_messages=3,
+
batching_max_publish_delay_ms=60000)
+ tasks = []
+ tasks.append(asyncio.create_task(producer.send(b'msg-0')))
+ tasks.append(asyncio.create_task(producer.send(b'msg-1')))
+
+ done, pending = await asyncio.wait(tasks, timeout=1,
return_when=asyncio.FIRST_COMPLETED)
+ self.assertEqual(len(done), 0)
+ self.assertEqual(len(pending), 2)
+
+ # flush will trigger sending the batched messages
+ await producer.flush()
+ for task in pending:
+ self.assertTrue(task.done())
+ msg_id0 = tasks[0].result()
+ msg_id1 = tasks[1].result()
+ self.assertEqual(msg_id0.ledger_id(), msg_id1.ledger_id())
+ self.assertEqual(msg_id0.entry_id(), msg_id1.entry_id())
+ self.assertEqual(msg_id0.batch_index(), 0)
+ self.assertEqual(msg_id1.batch_index(), 1)
+
async def test_create_producer_failure(self):
try:
await
self._client.create_producer('tenant/ns/asyncio-test-send-failure')