tvalentyn commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r3390769476


##########
sdks/python/apache_beam/io/gcp/pubsub_test.py:
##########
@@ -1098,6 +1097,71 @@ def 
test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
         Lineage.query(p.result.metrics(), Lineage.SINK),
         set(["pubsub:topic:fakeprj.a_topic"]))
 
+  def test_write_messages_with_ordering_key(self, mock_pubsub):
+    """Test WriteToPubSub with ordering_key in messages."""
+    data = b'data'
+    ordering_key = 'order-123'
+    attributes = {'key': 'value'}
+    payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+    # Verify that publish was called with ordering_key
+    mock_pubsub.return_value.publish.assert_called()
+    call_args = mock_pubsub.return_value.publish.call_args
+
+    # Check that ordering_key was passed as a keyword argument
+    self.assertIn('ordering_key', call_args.kwargs)
+    self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+  def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
+    """Test WriteToPubSub with ordering_key but no attributes."""
+    data = b'data'
+    ordering_key = 'order-456'
+    payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+    # Verify that publish was called with ordering_key
+    mock_pubsub.return_value.publish.assert_called()
+    call_args = mock_pubsub.return_value.publish.call_args
+
+    # Check that ordering_key was passed
+    self.assertIn('ordering_key', call_args.kwargs)
+    self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+  def test_write_messages_without_ordering_key(self, mock_pubsub):
+    """Test WriteToPubSub without ordering_key (backward compatibility)."""
+    data = b'data'
+    attributes = {'key': 'value'}
+    payloads = [PubsubMessage(data, attributes)]  # No ordering_key
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+    # Verify that publish was called
+    mock_pubsub.return_value.publish.assert_called()

Review Comment:
   +1 to the gemini-code-assist comment on Jan 18



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to