Copilot commented on code in PR #280:
URL:
https://github.com/apache/pulsar-client-python/pull/280#discussion_r2645165944
##########
pulsar/asyncio.py:
##########
@@ -127,6 +196,25 @@ async def close(self) -> None:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
+ def topic(self):
+ """
+ Return the topic which producer is publishing to
+ """
+ return self._producer.topic()
+
+ def producer_name(self):
+ """
+ Return the producer name which could have been assigned by the
+ system or specified by the client
+ """
+ return self._producer.producer_name()
+
+ def last_sequence_id(self):
+ """
+ Return the last sequence id that was published by this producer.
Review Comment:
The documentation for last_sequence_id() is incomplete compared to the
synchronous API. It should include the additional details about what the
sequence ID represents and its behavior after producer recreation. Consider
adding the complete documentation from pulsar/__init__.py lines 1470-1481 which
explains:
- It represents the automatically assigned or custom sequence id that was
published and acknowledged
- After recreating a producer with the same name, it returns the last
message from the previous session, or -1 if no message was ever published
```suggestion
Return the last sequence id that was published and acknowledged by
this producer.
The sequence id can be either automatically assigned or custom set
on the message.
After recreating a producer with the same name, this will return the
sequence id
of the last message that was published in the previous session, or
-1 if no
message was ever published.
```
##########
pulsar/asyncio.py:
##########
@@ -84,7 +88,17 @@ def __init__(self, producer: _pulsar.Producer, schema:
pulsar.schema.Schema) ->
self._producer = producer
self._schema = schema
- async def send(self, content: Any) -> pulsar.MessageId:
+ # pylint:
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+ async def send(self, content: Any,
+ properties: dict | None = None,
+ partition_key: str | None = None,
+ ordering_key: str | None = None,
+ sequence_id: int | None = None,
+ replication_clusters: List[str] | None = None,
+ disable_replication: bool | None = None,
+ event_timestamp: int | None = None,
+ deliver_at: int | None = None,
+ deliver_after: int | None = None) -> pulsar.MessageId:
Review Comment:
Type inconsistency: The deliver_after parameter is declared as int | None,
but the synchronous API in pulsar/__init__.py (line 1644) expects a timedelta
object. The documentation states "Specify a delay in timedelta" which matches
the synchronous API, but the type annotation and implementation don't match.
Either the type should be changed to accept timedelta objects (like the sync
API), or the documentation should clarify that this is an int representing
milliseconds.
##########
pulsar/asyncio.py:
##########
@@ -93,6 +107,28 @@ async def send(self, content: Any) -> pulsar.MessageId:
content: Any
The message payload, whose type should respect the schema defined
in
`Client.create_producer`.
+ properties: dict | None
+ A dict of application0-defined string properties.
Review Comment:
Typo in documentation: "application0-defined" should be
"application-defined".
```suggestion
A dict of application-defined string properties.
```
##########
pulsar/asyncio.py:
##########
@@ -332,13 +493,45 @@ async def create_producer(self, topic: str,
------
PulsarException
"""
+ if batching_enabled and chunking_enabled:
+ raise ValueError("Batching and chunking of messages can't be
enabled together.")
+
if schema is None:
schema = pulsar.schema.BytesSchema()
schema.attach_client(self._client)
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
+ if producer_name is not None:
+ conf.producer_name(producer_name)
conf.schema(schema.schema_info())
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.chunking_enabled(chunking_enabled)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ if properties is not None:
+ for k, v in properties.items():
+ conf.property(k, v)
+ conf.batching_type(batching_type)
+ if encryption_key is not None:
+ conf.encryption_key(encryption_key)
+ if crypto_key_reader is not None:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def default_router(msg: _pulsar.Message, num_partitions: int) ->
int:
+ return int(msg.partition_key()) % num_partitions
+ conf.message_router(default_router)
Review Comment:
The message_router parameter is accepted but not used correctly. When
message_router is provided, the code defines a default_router function that
uses msg.partition_key() instead of actually calling the provided
message_router function. This means the custom router is completely ignored.
Compare with the synchronous API in pulsar/__init__.py (line 990) which
correctly wraps and calls the user-provided message_router function. The
implementation should be:
```python
underlying_router = lambda msg, num_partitions:
int(message_router(pulsar.Message._wrap(msg), num_partitions))
conf.message_router(underlying_router)
```
```suggestion
underlying_router = (
lambda msg, num_partitions: int(
message_router(pulsar.Message._wrap(msg), num_partitions)
)
)
conf.message_router(underlying_router)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]