Copilot commented on code in PR #280:
URL: 
https://github.com/apache/pulsar-client-python/pull/280#discussion_r2645165944


##########
pulsar/asyncio.py:
##########
@@ -127,6 +196,25 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+    def topic(self):
+        """
+        Return the topic which producer is publishing to
+        """
+        return self._producer.topic()
+
+    def producer_name(self):
+        """
+        Return the producer name which could have been assigned by the
+        system or specified by the client
+        """
+        return self._producer.producer_name()
+
+    def last_sequence_id(self):
+        """
+        Return the last sequence id that was published by this producer.

Review Comment:
   The documentation for last_sequence_id() is incomplete compared to the 
synchronous API. It should include the additional details about what the 
sequence ID represents and its behavior after producer recreation. Consider 
adding the complete documentation from pulsar/__init__.py lines 1470-1481 which 
explains:
   - It represents the automatically assigned or custom sequence id that was 
published and acknowledged
   - After recreating a producer with the same name, it returns the last 
message from the previous session, or -1 if no message was ever published
   ```suggestion
           Return the last sequence id that was published and acknowledged by 
this producer.
   
           The sequence id can be either automatically assigned or custom set 
on the message.
           After recreating a producer with the same name, this will return the 
sequence id
           of the last message that was published in the previous session, or 
-1 if no
           message was ever published.
   ```



##########
pulsar/asyncio.py:
##########
@@ -84,7 +88,17 @@ def __init__(self, producer: _pulsar.Producer, schema: 
pulsar.schema.Schema) ->
         self._producer = producer
         self._schema = schema
 
-    async def send(self, content: Any) -> pulsar.MessageId:
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-positional-arguments
+    async def send(self, content: Any,
+                   properties: dict | None = None,
+                   partition_key: str | None = None,
+                   ordering_key: str | None = None,
+                   sequence_id: int | None = None,
+                   replication_clusters: List[str] | None = None,
+                   disable_replication: bool | None = None,
+                   event_timestamp: int | None = None,
+                   deliver_at: int | None = None,
+                   deliver_after: int | None = None) -> pulsar.MessageId:

Review Comment:
   Type inconsistency: The deliver_after parameter is declared as int | None, 
but the synchronous API in pulsar/__init__.py (line 1644) expects a timedelta 
object. The documentation states "Specify a delay in timedelta" which matches 
the synchronous API, but the type annotation and implementation don't match. 
Either the type should be changed to accept timedelta objects (like the sync 
API), or the documentation should clarify that this is an int representing 
milliseconds.



##########
pulsar/asyncio.py:
##########
@@ -93,6 +107,28 @@ async def send(self, content: Any) -> pulsar.MessageId:
         content: Any
             The message payload, whose type should respect the schema defined 
in
             `Client.create_producer`.
+        properties: dict | None
+            A dict of application0-defined string properties.

Review Comment:
   Typo in documentation: "application0-defined" should be 
"application-defined".
   ```suggestion
               A dict of application-defined string properties.
   ```



##########
pulsar/asyncio.py:
##########
@@ -332,13 +493,45 @@ async def create_producer(self, topic: str,
         ------
         PulsarException
         """
+        if batching_enabled and chunking_enabled:
+            raise ValueError("Batching and chunking of messages can't be 
enabled together.")
+
         if schema is None:
             schema = pulsar.schema.BytesSchema()
         schema.attach_client(self._client)
 
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
+        if producer_name is not None:
+            conf.producer_name(producer_name)
         conf.schema(schema.schema_info())
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.chunking_enabled(chunking_enabled)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        if properties is not None:
+            for k, v in properties.items():
+                conf.property(k, v)
+        conf.batching_type(batching_type)
+        if encryption_key is not None:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader is not None:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def default_router(msg: _pulsar.Message, num_partitions: int) -> 
int:
+                return int(msg.partition_key()) % num_partitions
+            conf.message_router(default_router)

Review Comment:
   The message_router parameter is accepted but not used correctly. When 
message_router is provided, the code defines a default_router function that 
uses msg.partition_key() instead of actually calling the provided 
message_router function. This means the custom router is completely ignored.
   
   Compare with the synchronous API in pulsar/__init__.py (line 990) which 
correctly wraps and calls the user-provided message_router function. The 
implementation should be:
   
   ```python
   underlying_router = lambda msg, num_partitions: 
int(message_router(pulsar.Message._wrap(msg), num_partitions))
   conf.message_router(underlying_router)
   ```
   ```suggestion
               underlying_router = (
                   lambda msg, num_partitions: int(
                       message_router(pulsar.Message._wrap(msg), num_partitions)
                   )
               )
               conf.message_router(underlying_router)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to