gemini-code-assist[bot] commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r3305801314


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java:
##########
@@ -85,6 +90,9 @@ public PTransform<PCollection<byte[]>, PDone> 
buildExternal(Configuration config
       if (config.timestampAttribute != null) {
         writeBuilder.setTimestampAttribute(config.timestampAttribute);
       }
+      if (config.publishWithOrderingKey) {
+        writeBuilder.setPublishWithOrderingKey(true);
+      }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Instead of conditionally setting the ordering key option only when it is 
true, we can directly pass the boolean configuration value to the builder. This 
simplifies the code and avoids unnecessary conditional blocks.
   
   ```java
         writeBuilder.setPublishWithOrderingKey(config.publishWithOrderingKey);
   ```



##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -647,21 +660,29 @@ def _flush(self):
     if not self._buffer:
       return
 
-    import time
-
     # The elements in buffer are serialized protobuf bytes from the previous
     # transforms. We need to deserialize them to extract data and attributes.
     futures = []
     for elem in self._buffer:
       # Deserialize the protobuf to get the original PubsubMessage
       pubsub_msg = PubsubMessage._from_proto_str(elem)
 
-      # Publish with the correct data and attributes
+      # Publish with the correct data, attributes, and ordering_key
       if self.with_attributes and pubsub_msg.attributes:
         future = self._pub_client.publish(
-            self._topic, pubsub_msg.data, **pubsub_msg.attributes)
+            self._topic,
+            pubsub_msg.data,
+            ordering_key=pubsub_msg.ordering_key
+            if pubsub_msg.ordering_key else '',
+            **pubsub_msg.attributes)
       else:
-        future = self._pub_client.publish(self._topic, pubsub_msg.data)
+        if pubsub_msg.ordering_key:
+          future = self._pub_client.publish(
+              self._topic,
+              pubsub_msg.data,
+              ordering_key=pubsub_msg.ordering_key)
+        else:
+          future = self._pub_client.publish(self._topic, pubsub_msg.data)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The current nested `if-else` blocks for publishing with/without attributes 
and ordering keys can be significantly simplified and made more maintainable by 
constructing a keyword arguments dictionary dynamically.
   
   ```python
         kwargs = {}
         if self.with_attributes and pubsub_msg.attributes:
           kwargs.update(pubsub_msg.attributes)
         if pubsub_msg.ordering_key:
           kwargs['ordering_key'] = pubsub_msg.ordering_key
         future = self._pub_client.publish(self._topic, pubsub_msg.data, 
**kwargs)
   ```



##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,96 @@ def test_batch_write_with_attributes(self):
     """Test WriteToPubSub in batch mode with attributes."""
     self._test_batch_write(with_attributes=True)
 
+  @pytest.mark.it_postcommit
+  def test_batch_write_with_ordering_key(self):
+    """Test WriteToPubSub in batch mode with ordering keys.
+
+    Dataflow's Native Pub/Sub Sink does not support ordering_key
+    (see https://github.com/apache/beam/issues/36201), so this test
+    only applies to runners using Beam's Python WriteToPubSub Sink.
+    Dataflow users should use the XLang WriteToPubSub path instead
+    (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
+    publish_with_ordering_key=True).
+    """
+    if self.runner_name == 'TestDataflowRunner':
+      self.skipTest(
+          'Dataflow Native PubSub Sink does not support ordering_key '
+          '(see https://github.com/apache/beam/issues/36201). '
+          'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub '
+          'with publish_with_ordering_key=True instead.')
+    from google.pubsub_v1.types import Subscription
+
+    from apache_beam.options.pipeline_options import PipelineOptions
+    from apache_beam.options.pipeline_options import StandardOptions
+    from apache_beam.transforms import Create
+
+    ordering_topic = self.pub_client.create_topic(
+        name=self.pub_client.topic_path(
+            self.project, 'psit_topic_ordering' + self.uuid))
+    ordering_sub = self.sub_client.create_subscription(
+        request=Subscription(
+            name=self.sub_client.subscription_path(
+                self.project, 'psit_sub_ordering' + self.uuid),
+            topic=ordering_topic.name,
+            enable_message_ordering=True,
+        ))
+    time.sleep(10)
+
+    try:
+      test_messages = [
+          PubsubMessage(
+              b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+          PubsubMessage(
+              b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+          PubsubMessage(
+              b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
+      ]
+
+      pipeline_options = PipelineOptions()
+      pipeline_options.view_as(StandardOptions).streaming = False
+
+      with TestPipeline(options=pipeline_options) as p:
+        messages = p | 'CreateMessages' >> Create(test_messages)
+        _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+            ordering_topic.name, with_attributes=True)
+
+      time.sleep(10)
+
+      # Retry pulling to handle PubSub delivery delays
+      received_messages = []
+      deadline = time.time() + 60  # wait up to 60 seconds
+      while time.time() < deadline:
+        response = self.sub_client.pull(
+            request={
+                'subscription': ordering_sub.name,
+                'max_messages': 10,
+            })
+        received_messages.extend(response.received_messages)
+        if len(received_messages) >= len(test_messages):
+          break
+        time.sleep(5)
+
+      self.assertEqual(len(received_messages), len(test_messages))
+
+      received_map = {
+          msg.message.data: msg.message
+          for msg in received_messages
+      }
+      self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1')
+      self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1')
+      self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2')
+
+      ack_ids = [msg.ack_id for msg in received_messages]
+      self.sub_client.acknowledge(

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To make the integration test robust against potential duplicate messages 
delivered by Pub/Sub (which could cause the pull loop to exit prematurely 
before all unique messages are received), we can track received messages in a 
dictionary keyed by message data. This also simplifies the assertions and 
acknowledgment logic.
   
   ```python
         received_messages = {}
         deadline = time.time() + 60  # wait up to 60 seconds
         while time.time() < deadline:
           response = self.sub_client.pull(
               request={
                   'subscription': ordering_sub.name,
                   'max_messages': 10,
               })
           for msg in response.received_messages:
             received_messages[msg.message.data] = msg
           if len(received_messages) >= len(test_messages):
             break
           time.sleep(5)
   
         self.assertEqual(len(received_messages), len(test_messages))
   
         
self.assertEqual(received_messages[b'order_data001'].message.ordering_key, 
'key1')
         
self.assertEqual(received_messages[b'order_data002'].message.ordering_key, 
'key1')
         
self.assertEqual(received_messages[b'order_data003'].message.ordering_key, 
'key2')
   
         ack_ids = [msg.ack_id for msg in received_messages.values()]
         self.sub_client.acknowledge(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to