gemini-code-assist[bot] commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r2703311041


##########
sdks/python/apache_beam/io/gcp/pubsub_test.py:
##########
@@ -1098,6 +1097,71 @@ def 
test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
         Lineage.query(p.result.metrics(), Lineage.SINK),
         set(["pubsub:topic:fakeprj.a_topic"]))
 
+  def test_write_messages_with_ordering_key(self, mock_pubsub):
+    """Test WriteToPubSub with ordering_key in messages."""
+    data = b'data'
+    ordering_key = 'order-123'
+    attributes = {'key': 'value'}
+    payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+    # Verify that publish was called with ordering_key
+    mock_pubsub.return_value.publish.assert_called()
+    call_args = mock_pubsub.return_value.publish.call_args
+
+    # Check that ordering_key was passed as a keyword argument
+    self.assertIn('ordering_key', call_args.kwargs)
+    self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+  def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
+    """Test WriteToPubSub with ordering_key but no attributes."""
+    data = b'data'
+    ordering_key = 'order-456'
+    payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+    # Verify that publish was called with ordering_key
+    mock_pubsub.return_value.publish.assert_called()
+    call_args = mock_pubsub.return_value.publish.call_args
+
+    # Check that ordering_key was passed
+    self.assertIn('ordering_key', call_args.kwargs)
+    self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+  def test_write_messages_without_ordering_key(self, mock_pubsub):
+    """Test WriteToPubSub without ordering_key (backward compatibility)."""
+    data = b'data'
+    attributes = {'key': 'value'}
+    payloads = [PubsubMessage(data, attributes)]  # No ordering_key
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+    # Verify that publish was called
+    mock_pubsub.return_value.publish.assert_called()

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To make this test more robust, you should also verify that `ordering_key` is 
not passed to the `publish` method when no ordering key is provided in the 
`PubsubMessage`. You can do this by checking the `call_args` of the mock.
   
   ```suggestion
       # Verify that publish was called
       mock_pubsub.return_value.publish.assert_called()
       call_args = mock_pubsub.return_value.publish.call_args
       self.assertNotIn('ordering_key', call_args.kwargs)
   ```



##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
     """Test WriteToPubSub in batch mode with attributes."""
     self._test_batch_write(with_attributes=True)
 
+  @pytest.mark.it_postcommit
+  def test_batch_write_with_ordering_key(self):
+    """Test WriteToPubSub in batch mode with ordering keys."""
+    from apache_beam.options.pipeline_options import PipelineOptions
+    from apache_beam.options.pipeline_options import StandardOptions
+    from apache_beam.transforms import Create
+
+    # Create test messages with ordering keys
+    test_messages = [
+        PubsubMessage(
+            b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+        PubsubMessage(
+            b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+        PubsubMessage(
+            b'order_data003', {'attr': 'value3'}, ordering_key='key2')
+    ]
+
+    pipeline_options = PipelineOptions()
+    pipeline_options.view_as(StandardOptions).streaming = False
+
+    with TestPipeline(options=pipeline_options) as p:
+      messages = p | 'CreateMessages' >> Create(test_messages)
+      _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+          self.output_topic.name, with_attributes=True)
+
+    # Verify messages were published
+    time.sleep(10)
+
+    response = self.sub_client.pull(
+        request={
+            "subscription": self.output_sub.name,
+            "max_messages": 10,
+        })
+
+    self.assertEqual(len(response.received_messages), len(test_messages))
+
+    # Verify ordering keys were preserved
+    for received_message in response.received_messages:
+      self.assertIn('ordering_key', dir(received_message.message))
+      self.sub_client.acknowledge(
+          request={
+              "subscription": self.output_sub.name,
+              "ack_ids": [received_message.ack_id],
+          })

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The current verification for `ordering_key` only checks for its presence. To 
make the test more robust, you should assert that the `ordering_key` has the 
expected value for each message.
   
   Additionally, you can improve efficiency by acknowledging all messages in a 
single batch request after pulling them, rather than one by one inside the loop.
   
   ```suggestion
       received_map = {msg.message.data: msg for msg in 
response.received_messages}
       self.assertEqual(received_map[b'order_data001'].message.ordering_key, 
'key1')
       self.assertEqual(received_map[b'order_data002'].message.ordering_key, 
'key1')
       self.assertEqual(received_map[b'order_data003'].message.ordering_key, 
'key2')
   
       ack_ids = [msg.ack_id for msg in response.received_messages]
       if ack_ids:
         self.sub_client.acknowledge(
             request={
                 "subscription": self.output_sub.name,
                 "ack_ids": ack_ids,
             })
   ```



##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -658,12 +658,22 @@ def _flush(self):
       # Deserialize the protobuf to get the original PubsubMessage
       pubsub_msg = PubsubMessage._from_proto_str(elem)
 
-      # Publish with the correct data and attributes
+      # Publish with the correct data, attributes, and ordering_key
       if self.with_attributes and pubsub_msg.attributes:
         future = self._pub_client.publish(
-            self._topic, pubsub_msg.data, **pubsub_msg.attributes)
+            self._topic,
+            pubsub_msg.data,
+            ordering_key=pubsub_msg.ordering_key
+            if pubsub_msg.ordering_key else '',
+            **pubsub_msg.attributes)
       else:
-        future = self._pub_client.publish(self._topic, pubsub_msg.data)
+        if pubsub_msg.ordering_key:
+          future = self._pub_client.publish(
+              self._topic,
+              pubsub_msg.data,
+              ordering_key=pubsub_msg.ordering_key)
+        else:
+          future = self._pub_client.publish(self._topic, pubsub_msg.data)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic for publishing messages can be simplified to improve readability 
and reduce code duplication. You can determine the attributes and 
`ordering_key` to use and then have a single call to 
`self._pub_client.publish`. This makes the code easier to maintain.
   
   ```python
         attributes = {}
         if self.with_attributes and pubsub_msg.attributes:
           attributes = pubsub_msg.attributes
   
         if pubsub_msg.ordering_key:
           future = self._pub_client.publish(
               self._topic,
               pubsub_msg.data,
               ordering_key=pubsub_msg.ordering_key,
               **attributes)
         else:
           future = self._pub_client.publish(
               self._topic, pubsub_msg.data, **attributes)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to