This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 9432cb8  Improve asynchronous producer with more options for creation 
and send (#280)
9432cb8 is described below

commit 9432cb8354dedf1f402a9b698ea2d75bee83d492
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 24 22:08:36 2025 +0800

    Improve asynchronous producer with more options for creation and send (#280)
---
 pulsar/asyncio.py     | 203 +++++++++++++++++++++++++++++++++++++++++++++++++-
 src/producer.cc       |   3 +-
 tests/asyncio_test.py |  44 ++++++++++-
 3 files changed, 246 insertions(+), 4 deletions(-)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 01246c6..5c3178a 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -25,11 +25,16 @@ The Pulsar Python client APIs that work with the asyncio 
module.
 
 import asyncio
 import functools
-from typing import Any, List, Union
+from datetime import timedelta
+from typing import Any, Callable, List, Union
 
 import _pulsar
 from _pulsar import (
     InitialPosition,
+    CompressionType,
+    PartitionsRoutingMode,
+    BatchingType,
+    ProducerAccessMode,
     RegexSubscriptionMode,
     ConsumerCryptoFailureAction,
 )
@@ -84,7 +89,17 @@ class Producer:
         self._producer = producer
         self._schema = schema
 
-    async def send(self, content: Any) -> pulsar.MessageId:
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+    async def send(self, content: Any,
+                   properties: dict | None = None,
+                   partition_key: str | None = None,
+                   ordering_key: str | None = None,
+                   sequence_id: int | None = None,
+                   replication_clusters: List[str] | None = None,
+                   disable_replication: bool | None = None,
+                   event_timestamp: int | None = None,
+                   deliver_at: int | None = None,
+                   deliver_after: timedelta | None = None) -> pulsar.MessageId:
         """
         Send a message asynchronously.
 
@@ -93,6 +108,28 @@ class Producer:
         content: Any
             The message payload, whose type should respect the schema defined 
in
             `Client.create_producer`.
+        properties: dict | None
+            A dict of application-defined string properties.
+        partition_key: str | None
+            Sets the partition key for the message routing. A hash of this key 
is
+            used to determine the message's topic partition.
+        ordering_key: str | None
+            Sets the ordering key for the message routing.
+        sequence_id: int | None
+            Specify a custom sequence id for the message being published.
+        replication_clusters: List[str] | None
+            Override namespace replication clusters. Note that it is the 
caller's responsibility
+            to provide valid cluster names and that all clusters have been 
previously configured
+            as topics. Given an empty list, the message will replicate per the 
namespace
+            configuration.
+        disable_replication: bool | None
+            Do not replicate this message.
+        event_timestamp: int | None
+            Timestamp in millis of the timestamp of event creation
+        deliver_at: int | None
+            Specify the message should not be delivered earlier than the 
specified timestamp.
+        deliver_after: timedelta | None
+            Specify a delay in timedelta for the delivery of the messages.
 
         Returns
         -------
@@ -105,6 +142,27 @@ class Producer:
         """
         builder = _pulsar.MessageBuilder()
         builder.content(self._schema.encode(content))
+
+        if properties is not None:
+            for k, v in properties.items():
+                builder.property(k, v)
+        if partition_key is not None:
+            builder.partition_key(partition_key)
+        if ordering_key is not None:
+            builder.ordering_key(ordering_key)
+        if sequence_id is not None:
+            builder.sequence_id(sequence_id)
+        if replication_clusters is not None:
+            builder.replication_clusters(replication_clusters)
+        if disable_replication is not None:
+            builder.disable_replication(disable_replication)
+        if event_timestamp is not None:
+            builder.event_timestamp(event_timestamp)
+        if deliver_at is not None:
+            builder.deliver_at(deliver_at)
+        if deliver_after is not None:
+            builder.deliver_after(deliver_after)
+
         future = asyncio.get_running_loop().create_future()
         self._producer.send_async(builder.build(), 
functools.partial(_set_future, future))
         msg_id = await future
@@ -115,6 +173,18 @@ class Producer:
             msg_id.batch_index(),
         )
 
+    async def flush(self) -> None:
+        """
+        Flush all the messages buffered in the producer asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._producer.flush_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
     async def close(self) -> None:
         """
         Close the producer.
@@ -127,6 +197,30 @@ class Producer:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+    def topic(self):
+        """
+        Return the topic which producer is publishing to
+        """
+        return self._producer.topic()
+
+    def producer_name(self):
+        """
+        Return the producer name which could have been assigned by the
+        system or specified by the client
+        """
+        return self._producer.producer_name()
+
+    def last_sequence_id(self):
+        """
+        Return the last sequence id that was published and acknowledged by 
this producer.
+
+        The sequence id can be either automatically assigned or custom set on 
the message.
+        After recreating a producer with the same name, this will return the 
sequence id
+        of the last message that was published in the previous session, or -1 
if no
+        message was ever published.
+        """
+        return self._producer.last_sequence_id()
+
 class Consumer:
     """
     The Pulsar message consumer, used to subscribe to messages from a topic.
@@ -311,7 +405,28 @@ class Client:
 
     # pylint: 
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
     async def create_producer(self, topic: str,
+                              producer_name: str | None = None,
                               schema: pulsar.schema.Schema | None = None,
+                              initial_sequence_id: int | None = None,
+                              send_timeout_millis: int = 30000,
+                              compression_type: CompressionType = 
CompressionType.NONE,
+                              max_pending_messages: int = 1000,
+                              max_pending_messages_across_partitions: int = 
50000,
+                              block_if_queue_full: bool = False,
+                              batching_enabled: bool = True,
+                              batching_max_messages: int = 1000,
+                              batching_max_allowed_size_in_bytes: int = 
128*1024,
+                              batching_max_publish_delay_ms: int = 10,
+                              chunking_enabled: bool = False,
+                              message_routing_mode: PartitionsRoutingMode =
+                              PartitionsRoutingMode.RoundRobinDistribution,
+                              lazy_start_partitioned_producers: bool = False,
+                              properties: dict | None = None,
+                              batching_type: BatchingType = 
BatchingType.Default,
+                              encryption_key: str | None = None,
+                              crypto_key_reader: pulsar.CryptoKeyReader | None 
= None,
+                              access_mode: ProducerAccessMode = 
ProducerAccessMode.Shared,
+                              message_router: Callable[[pulsar.Message, int], 
int] | None = None,
                               ) -> Producer:
         """
         Create a new producer on a given topic
@@ -320,8 +435,60 @@ class Client:
         ----------
         topic: str
             The topic name
+        producer_name: str | None
+            Specify a name for the producer. If not assigned, the system will 
generate a globally
+            unique name which can be accessed with `Producer.producer_name()`. 
When specifying a
+            name, it is up to the user to ensure that, for a given topic, the 
producer name is
+            unique across all Pulsar's clusters.
         schema: pulsar.schema.Schema | None, default=None
             Define the schema of the data that will be published by this 
producer.
+        initial_sequence_id: int | None, default=None
+            Set the baseline for the sequence ids for messages published by
+            the producer.
+        send_timeout_millis: int, default=30000
+            If a message is not acknowledged by the server before the
+            send_timeout expires, an error will be reported.
+        compression_type: CompressionType, default=CompressionType.NONE
+            Set the compression type for the producer.
+        max_pending_messages: int, default=1000
+            Set the max size of the queue holding the messages pending to
+            receive an acknowledgment from the broker.
+        max_pending_messages_across_partitions: int, default=50000
+            Set the max size of the queue holding the messages pending to
+            receive an acknowledgment across partitions.
+        block_if_queue_full: bool, default=False
+            Set whether send operations should block when the outgoing
+            message queue is full.
+        batching_enabled: bool, default=True
+            Enable automatic message batching. Note that, unlike the 
synchronous producer API in
+            ``pulsar.Client.create_producer``, batching is enabled by default 
for the asyncio
+            producer.
+        batching_max_messages: int, default=1000
+            Maximum number of messages in a batch.
+        batching_max_allowed_size_in_bytes: int, default=128*1024
+            Maximum size in bytes of a batch.
+        batching_max_publish_delay_ms: int, default=10
+            The batch interval in milliseconds.
+        chunking_enabled: bool, default=False
+            Enable chunking of large messages.
+        message_routing_mode: PartitionsRoutingMode,
+            default=PartitionsRoutingMode.RoundRobinDistribution
+            Set the message routing mode for the partitioned producer.
+        lazy_start_partitioned_producers: bool, default=False
+            Start partitioned producers lazily on demand.
+        properties: dict | None, default=None
+            Sets the properties for the producer.
+        batching_type: BatchingType, default=BatchingType.Default
+            Sets the batching type for the producer.
+        encryption_key: str | None, default=None
+            The key used for symmetric encryption.
+        crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+            Symmetric encryption class implementation.
+        access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
+            Set the type of access mode that the producer requires on the 
topic.
+        message_router: Callable[[pulsar.Message, int], int] | None, 
default=None
+            A custom message router function that takes a Message and the
+            number of partitions and returns the partition index.
 
         Returns
         -------
@@ -332,13 +499,45 @@ class Client:
         ------
         PulsarException
         """
+        if batching_enabled and chunking_enabled:
+            raise ValueError("Batching and chunking of messages can't be 
enabled together.")
+
         if schema is None:
             schema = pulsar.schema.BytesSchema()
         schema.attach_client(self._client)
 
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
+        if producer_name is not None:
+            conf.producer_name(producer_name)
         conf.schema(schema.schema_info())
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.chunking_enabled(chunking_enabled)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        if properties is not None:
+            for k, v in properties.items():
+                conf.property(k, v)
+        conf.batching_type(batching_type)
+        if encryption_key is not None:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader is not None:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg: _pulsar.Message, num_partitions: int) 
-> int:
+                return message_router(pulsar.Message._wrap(msg), 
num_partitions)
+            conf.message_router(underlying_router)
 
         self._client.create_producer_async(
             topic, conf, functools.partial(_set_future, future)
diff --git a/src/producer.cc b/src/producer.cc
index 9b38016..6a9b1a5 100644
--- a/src/producer.cc
+++ b/src/producer.cc
@@ -82,5 +82,6 @@ void export_producer(py::module_& m) {
              "successfully persisted\n")
         .def("close", &Producer_close)
         .def("close_async", &Producer_closeAsync)
-        .def("is_connected", &Producer::isConnected);
+        .def("is_connected", &Producer::isConnected)
+        .def("flush_async", &Producer::flushAsync);
 }
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 656ffba..048dc43 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -60,12 +60,18 @@ class AsyncioTest(IsolatedAsyncioTestCase):
 
     async def test_batch_end_to_end(self):
         topic = f'asyncio-test-batch-e2e-{time.time()}'
-        producer = await self._client.create_producer(topic)
+        producer = await self._client.create_producer(topic,
+                                                      
producer_name="my-producer")
+        self.assertEqual(producer.topic(), 
f'persistent://public/default/{topic}')
+        self.assertEqual(producer.producer_name(), "my-producer")
         tasks = []
         for i in range(5):
             
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
         msg_ids = await asyncio.gather(*tasks)
         self.assertEqual(len(msg_ids), 5)
+        # pylint: disable=fixme
+        # TODO: the result is wrong due to 
https://github.com/apache/pulsar-client-cpp/issues/531
+        self.assertEqual(producer.last_sequence_id(), 8)
         ledger_id = msg_ids[0].ledger_id()
         entry_id = msg_ids[0].entry_id()
         # These messages should be in the same entry
@@ -90,6 +96,42 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         msg = await consumer.receive()
         self.assertEqual(msg.data(), b'final-message')
 
+    async def test_send_keyed_message(self):
+        topic = f'asyncio-test-send-keyed-message-{time.time()}'
+        producer = await self._client.create_producer(topic)
+        consumer = await self._client.subscribe(topic, 'sub')
+        await producer.send(b'msg', partition_key='key0',
+                                     ordering_key="key1", 
properties={'my-prop': 'my-value'})
+
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg')
+        self.assertEqual(msg.partition_key(), 'key0')
+        self.assertEqual(msg.ordering_key(), 'key1')
+        self.assertEqual(msg.properties(), {'my-prop': 'my-value'})
+
+    async def test_flush(self):
+        topic = f'asyncio-test-flush-{time.time()}'
+        producer = await self._client.create_producer(topic, 
batching_max_messages=3,
+                                                      
batching_max_publish_delay_ms=60000)
+        tasks = []
+        tasks.append(asyncio.create_task(producer.send(b'msg-0')))
+        tasks.append(asyncio.create_task(producer.send(b'msg-1')))
+
+        done, pending = await asyncio.wait(tasks, timeout=1, 
return_when=asyncio.FIRST_COMPLETED)
+        self.assertEqual(len(done), 0)
+        self.assertEqual(len(pending), 2)
+
+        # flush will trigger sending the batched messages
+        await producer.flush()
+        for task in pending:
+            self.assertTrue(task.done())
+        msg_id0 = tasks[0].result()
+        msg_id1 = tasks[1].result()
+        self.assertEqual(msg_id0.ledger_id(), msg_id1.ledger_id())
+        self.assertEqual(msg_id0.entry_id(), msg_id1.entry_id())
+        self.assertEqual(msg_id0.batch_index(), 0)
+        self.assertEqual(msg_id1.batch_index(), 1)
+
     async def test_create_producer_failure(self):
         try:
             await 
self._client.create_producer('tenant/ns/asyncio-test-send-failure')

Reply via email to