gemini-code-assist[bot] commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r2766449810


##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -658,12 +658,22 @@ def _flush(self):
       # Deserialize the protobuf to get the original PubsubMessage
       pubsub_msg = PubsubMessage._from_proto_str(elem)
 
-      # Publish with the correct data and attributes
+      # Publish with the correct data, attributes, and ordering_key
       if self.with_attributes and pubsub_msg.attributes:
         future = self._pub_client.publish(
-            self._topic, pubsub_msg.data, **pubsub_msg.attributes)
+            self._topic,
+            pubsub_msg.data,
+            ordering_key=pubsub_msg.ordering_key
+            if pubsub_msg.ordering_key else '',
+            **pubsub_msg.attributes)
       else:
-        future = self._pub_client.publish(self._topic, pubsub_msg.data)
+        if pubsub_msg.ordering_key:
+          future = self._pub_client.publish(
+              self._topic,
+              pubsub_msg.data,
+              ordering_key=pubsub_msg.ordering_key)
+        else:
+          future = self._pub_client.publish(self._topic, pubsub_msg.data)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic for publishing messages can be simplified to reduce code 
duplication and improve readability. Instead of separate code paths for 
messages with and without attributes, you can build a dictionary of keyword 
arguments for the `publish` call. This makes the code cleaner and easier to 
maintain.
   
   ```python
         publish_kwargs = {}
         if self.with_attributes and pubsub_msg.attributes:
           publish_kwargs.update(pubsub_msg.attributes)
   
         if pubsub_msg.ordering_key:
           publish_kwargs['ordering_key'] = pubsub_msg.ordering_key
   
         future = self._pub_client.publish(
             self._topic, pubsub_msg.data, **publish_kwargs)
   ```



##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
     """Test WriteToPubSub in batch mode with attributes."""
     self._test_batch_write(with_attributes=True)
 
+  @pytest.mark.it_postcommit
+  def test_batch_write_with_ordering_key(self):
+    """Test WriteToPubSub in batch mode with ordering keys."""
+    from apache_beam.options.pipeline_options import PipelineOptions
+    from apache_beam.options.pipeline_options import StandardOptions
+    from apache_beam.transforms import Create
+
+    # Create test messages with ordering keys
+    test_messages = [
+        PubsubMessage(
+            b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+        PubsubMessage(
+            b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+        PubsubMessage(
+            b'order_data003', {'attr': 'value3'}, ordering_key='key2')
+    ]
+
+    pipeline_options = PipelineOptions()
+    pipeline_options.view_as(StandardOptions).streaming = False
+
+    with TestPipeline(options=pipeline_options) as p:
+      messages = p | 'CreateMessages' >> Create(test_messages)
+      _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+          self.output_topic.name, with_attributes=True)
+
+    # Verify messages were published
+    time.sleep(10)
+
+    response = self.sub_client.pull(
+        request={
+            "subscription": self.output_sub.name,
+            "max_messages": 10,
+        })
+
+    self.assertEqual(len(response.received_messages), len(test_messages))
+
+    # Verify ordering keys were preserved
+    for received_message in response.received_messages:
+      self.assertIn('ordering_key', dir(received_message.message))
+      self.sub_client.acknowledge(
+          request={
+              "subscription": self.output_sub.name,
+              "ack_ids": [received_message.ack_id],
+          })

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The assertion in this test is quite weak. It only checks for the presence of 
the `ordering_key` attribute on the message object using `dir()`, but doesn't 
verify its value. A more robust test would be to check that the received 
messages have the correct ordering keys and attributes corresponding to the 
messages that were sent. Also, acknowledging messages one by one in a loop is 
inefficient; it's better to collect all `ack_id`s and acknowledge them in a 
single call.
   
   ```suggestion
       # Verify ordering keys and attributes were preserved
       received_msgs_map = {
           msg.message.data: msg.message
           for msg in response.received_messages
       }
   
       for sent_msg in test_messages:
         self.assertIn(sent_msg.data, received_msgs_map)
         received_msg = received_msgs_map[sent_msg.data]
         self.assertEqual(received_msg.ordering_key, sent_msg.ordering_key)
         self.assertEqual(dict(received_msg.attributes), sent_msg.attributes)
   
       # Acknowledge all messages at once for efficiency
       ack_ids = [msg.ack_id for msg in response.received_messages]
       if ack_ids:
         self.sub_client.acknowledge(
             request={
                 "subscription": self.output_sub.name,
                 "ack_ids": ack_ids,
             })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to