gemini-code-assist[bot] commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r2766449810
##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -658,12 +658,22 @@ def _flush(self):
# Deserialize the protobuf to get the original PubsubMessage
pubsub_msg = PubsubMessage._from_proto_str(elem)
- # Publish with the correct data and attributes
+ # Publish with the correct data, attributes, and ordering_key
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
- self._topic, pubsub_msg.data, **pubsub_msg.attributes)
+ self._topic,
+ pubsub_msg.data,
+ ordering_key=pubsub_msg.ordering_key
+ if pubsub_msg.ordering_key else '',
+ **pubsub_msg.attributes)
else:
- future = self._pub_client.publish(self._topic, pubsub_msg.data)
+ if pubsub_msg.ordering_key:
+ future = self._pub_client.publish(
+ self._topic,
+ pubsub_msg.data,
+ ordering_key=pubsub_msg.ordering_key)
+ else:
+ future = self._pub_client.publish(self._topic, pubsub_msg.data)
Review Comment:

The logic for publishing messages can be simplified to reduce code
duplication and improve readability. Instead of separate code paths for
messages with and without attributes, you can build a dictionary of keyword
arguments for the `publish` call. This makes the code cleaner and easier to
maintain.
```python
publish_kwargs = {}
if self.with_attributes and pubsub_msg.attributes:
publish_kwargs.update(pubsub_msg.attributes)
if pubsub_msg.ordering_key:
publish_kwargs['ordering_key'] = pubsub_msg.ordering_key
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **publish_kwargs)
```
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
+ @pytest.mark.it_postcommit
+ def test_batch_write_with_ordering_key(self):
+ """Test WriteToPubSub in batch mode with ordering keys."""
+ from apache_beam.options.pipeline_options import PipelineOptions
+ from apache_beam.options.pipeline_options import StandardOptions
+ from apache_beam.transforms import Create
+
+ # Create test messages with ordering keys
+ test_messages = [
+ PubsubMessage(
+ b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data003', {'attr': 'value3'}, ordering_key='key2')
+ ]
+
+ pipeline_options = PipelineOptions()
+ pipeline_options.view_as(StandardOptions).streaming = False
+
+ with TestPipeline(options=pipeline_options) as p:
+ messages = p | 'CreateMessages' >> Create(test_messages)
+ _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+ self.output_topic.name, with_attributes=True)
+
+ # Verify messages were published
+ time.sleep(10)
+
+ response = self.sub_client.pull(
+ request={
+ "subscription": self.output_sub.name,
+ "max_messages": 10,
+ })
+
+ self.assertEqual(len(response.received_messages), len(test_messages))
+
+ # Verify ordering keys were preserved
+ for received_message in response.received_messages:
+ self.assertIn('ordering_key', dir(received_message.message))
+ self.sub_client.acknowledge(
+ request={
+ "subscription": self.output_sub.name,
+ "ack_ids": [received_message.ack_id],
+ })
Review Comment:

The assertion in this test is quite weak. It only checks for the presence of
the `ordering_key` attribute on the message object using `dir()`, but doesn't
verify its value. A more robust test would be to check that the received
messages have the correct ordering keys and attributes corresponding to the
messages that were sent. Also, acknowledging messages one by one in a loop is
inefficient; it's better to collect all `ack_id`s and acknowledge them in a
single call.
```suggestion
# Verify ordering keys and attributes were preserved
received_msgs_map = {
msg.message.data: msg.message
for msg in response.received_messages
}
for sent_msg in test_messages:
self.assertIn(sent_msg.data, received_msgs_map)
received_msg = received_msgs_map[sent_msg.data]
self.assertEqual(received_msg.ordering_key, sent_msg.ordering_key)
self.assertEqual(dict(received_msg.attributes), sent_msg.attributes)
# Acknowledge all messages at once for efficiency
ack_ids = [msg.ack_id for msg in response.received_messages]
if ack_ids:
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": ack_ids,
})
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]