This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bc785253eb Fix WriteToPubSub to pass ordering_key to publish() method 
(#37345)
0bc785253eb is described below

commit 0bc785253ebb7622496f134934031fd07a68c8bc
Author: Nikita Grover <[email protected]>
AuthorDate: Wed Jun 17 07:07:29 2026 +0530

    Fix WriteToPubSub to pass ordering_key to publish() method (#37345)
    
    * Fix WriteToPubSub to pass ordering_key to publish() method
    
    Fixes #36201
    
    * Update pubsub_test.py
    
    * Update pubsub_test.py
    
    * Trigger CI rerun
    
    * Retry CI (flake)
    
    * Apply yapf formatting
    
    * Address review comments: use message_to_proto_str and skip ordering key 
test on Dataflow
    
    * Add Dataflow warning in WriteToPubSub.expand() for ordering key support
    
    * Update pubsub_integration_test.py
    
    * Update PR and modification in beam_PostCommit_Python.json
    
    Updated PR number and modification count.
    
    * Update pubsub_integration_test.py
    
    * Update pubsub_integration_test.py
    
    * Fix PubSub error
    
    * Update pubsub_integration_test.py
    
    * Fix formatting: remove trailing whitespace
    
    * Fix ordering key integration test: retry pull loop, fix indentation
    
    * Fix import order: google imports before apache_beam (isort)
    
    * Fix import ordering in pubsub_integration_test.py
    
    * Simplify publish kwargs, conditionally enable message ordering, add retry 
loop in integration test
    
    * Use with_attributes to initialize with_ordering and apply in 
PublisherClient setup
    
    * Fix PubSub tests: enable message ordering in PublisherClient to exercise 
full ordering flow
    
    * Rename to publish_with_ordering_key, gate Dataflow warning, fix test 
assertions
    
    ---------
    
    Co-authored-by: Nikita Grover <[email protected]>
---
 .github/trigger_files/beam_PostCommit_Python.json  |  4 +-
 .../beam/sdk/io/gcp/pubsub/ExternalWrite.java      |  6 ++
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    |  5 +-
 sdks/python/apache_beam/io/external/gcp/pubsub.py  | 11 ++-
 sdks/python/apache_beam/io/gcp/pubsub.py           | 48 +++++++++---
 .../apache_beam/io/gcp/pubsub_integration_test.py  | 90 ++++++++++++++++++++++
 sdks/python/apache_beam/io/gcp/pubsub_test.py      | 72 +++++++++++++++++
 7 files changed, 219 insertions(+), 17 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index def2ac98408..11064375d62 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "pr": "38069",
+  "pr": "37345",
   "modification": 49
-}
+} 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 658d1fc29e3..23b14b68a08 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -53,6 +53,7 @@ public final class ExternalWrite implements 
ExternalTransformRegistrar {
     private String topic;
     private @Nullable String idAttribute;
     private @Nullable String timestampAttribute;
+    private boolean publishWithOrderingKey = false;
 
     public void setTopic(String topic) {
       this.topic = topic;
@@ -65,6 +66,10 @@ public final class ExternalWrite implements 
ExternalTransformRegistrar {
     public void setTimestampAttribute(@Nullable String timestampAttribute) {
       this.timestampAttribute = timestampAttribute;
     }
+
+    public void setPublishWithOrderingKey(Boolean publishWithOrderingKey) {
+      this.publishWithOrderingKey = publishWithOrderingKey != null && 
publishWithOrderingKey;
+    }
   }
 
   public static class WriteBuilder
@@ -85,6 +90,7 @@ public final class ExternalWrite implements 
ExternalTransformRegistrar {
       if (config.timestampAttribute != null) {
         writeBuilder.setTimestampAttribute(config.timestampAttribute);
       }
+      writeBuilder.setPublishWithOrderingKey(config.publishWithOrderingKey);
       writeBuilder.setDynamicDestinations(false);
       return writeBuilder.build();
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index d62d294ed2a..57005745044 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -1727,7 +1727,10 @@ public class PubsubIO {
         this.pubsubClient =
             getPubsubClientFactory()
                 .newClient(
-                    getTimestampAttribute(), null, 
c.getPipelineOptions().as(PubsubOptions.class));
+                    getTimestampAttribute(),
+                    null,
+                    c.getPipelineOptions().as(PubsubOptions.class),
+                    Write.this.getPubsubRootUrl());
       }
 
       @ProcessElement
diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py 
b/sdks/python/apache_beam/io/external/gcp/pubsub.py
index a2a3430f9a1..3125c042227 100644
--- a/sdks/python/apache_beam/io/external/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py
@@ -117,6 +117,7 @@ WriteToPubsubSchema = typing.NamedTuple(
         # this is not implemented yet on the Java side:
         # ('with_attributes', bool),
         ('timestamp_attribute', typing.Optional[str]),
+        ('publish_with_ordering_key', bool),
     ])
 
 
@@ -135,6 +136,7 @@ class WriteToPubSub(beam.PTransform):
       with_attributes=False,
       id_label=None,
       timestamp_attribute=None,
+      publish_with_ordering_key=False,
       expansion_service=None):
     """Initializes ``WriteToPubSub``.
 
@@ -150,18 +152,23 @@ class WriteToPubSub(beam.PTransform):
         in a ReadFromPubSub PTransform to deduplicate messages.
       timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
         message with the given name and the message's publish time as the 
value.
+      publish_with_ordering_key: If True, enables ordering key support when
+        publishing messages. The ordering key must be set on each
+        PubsubMessage via the ``ordering_key`` attribute. 
     """
     self.params = WriteToPubsubSchema(
         topic=topic,
         id_label=id_label,
         # with_attributes=with_attributes,
-        timestamp_attribute=timestamp_attribute)
+        timestamp_attribute=timestamp_attribute,
+        publish_with_ordering_key=publish_with_ordering_key)
     self.expansion_service = expansion_service
     self.with_attributes = with_attributes
 
   def expand(self, pvalue):
     if self.with_attributes:
-      pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
+      pcoll = pvalue | 'ToProto' >> Map(
+          pubsub.WriteToPubSub.message_to_proto_str)
     else:
       pcoll = pvalue | 'ToProto' >> Map(
           lambda x: pubsub.PubsubMessage(x, {})._to_proto_str())
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 276103f5276..55856f50478 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -31,8 +31,9 @@ https://github.com/apache/beam/blob/master/sdks/python/OWNERS
 """
 
 # pytype: skip-file
-
+import logging
 import re
+import time
 from typing import Any
 from typing import NamedTuple
 from typing import Optional
@@ -388,7 +389,8 @@ class WriteToPubSub(PTransform):
       topic: str,
       with_attributes: bool = False,
       id_label: Optional[str] = None,
-      timestamp_attribute: Optional[str] = None) -> None:
+      timestamp_attribute: Optional[str] = None,
+      publish_with_ordering_key: bool = False) -> None:
     """Initializes ``WriteToPubSub``.
 
     Args:
@@ -404,9 +406,13 @@ class WriteToPubSub(PTransform):
         in a ReadFromPubSub PTransform to deduplicate messages.
       timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
         message with the given name and the message's publish time as the 
value.
+      publish_with_ordering_key: If True, enables message ordering on the
+        PublisherClient. Messages with an ordering_key will be delivered
+        in order. Requires messages to have ordering_key set.
     """
     super().__init__()
     self.with_attributes = with_attributes
+    self.publish_with_ordering_key = publish_with_ordering_key
     self.id_label = id_label
     self.timestamp_attribute = timestamp_attribute
     self.project, self.topic_name = parse_topic(topic)
@@ -430,7 +436,16 @@ class WriteToPubSub(PTransform):
   def expand(self, pcoll):
     # Store pipeline options for use in DoFn
     self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None
-
+    # Warn Dataflow users to use the XLang path for ordering key support,
+    # since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation.
+    runner = self.pipeline_options.get_all_options().get(
+        'runner', '') if self.pipeline_options else ''
+    if 'Dataflow' in str(runner) and self.publish_with_ordering_key:
+      logging.warning(
+          'WriteToPubSub ordering_key support is not available on Dataflow '
+          'via this transform. Use the XLang WriteToPubSub path instead: '
+          'apache_beam.io.external.gcp.pubsub.WriteToPubSub with '
+          'publish_with_ordering_key=True.')
     if self.with_attributes:
       pcoll = pcoll | 'ToProtobufX' >> ParDo(
           _AddMetricsAndMap(
@@ -457,6 +472,9 @@ class WriteToPubSub(PTransform):
             True, label='With Attributes').drop_if_none(),
         'timestamp_attribute': DisplayDataItem(
             self.timestamp_attribute, label='Timestamp Attribute'),
+        'publish_with_ordering_key': DisplayDataItem(
+            self.publish_with_ordering_key,
+            label='Publish With Ordering Key').drop_if_none(),
     }
 
 
@@ -563,6 +581,7 @@ class _PubSubWriteDoFn(DoFn):
     self.id_label = transform.id_label
     self.timestamp_attribute = transform.timestamp_attribute
     self.with_attributes = transform.with_attributes
+    self.with_ordering = transform.publish_with_ordering_key
 
     # TODO(https://github.com/apache/beam/issues/18939): Add support for
     # id_label and timestamp_attribute.
@@ -597,7 +616,7 @@ class _PubSubWriteDoFn(DoFn):
       output_labels_supported = False
 
     # Log debug information for troubleshooting
-    import logging
+
     runner_info = getattr(
         pipeline_options, 'runner',
         'None') if pipeline_options else 'No options'
@@ -628,7 +647,13 @@ class _PubSubWriteDoFn(DoFn):
 
   def setup(self):
     from google.cloud import pubsub
-    self._pub_client = pubsub.PublisherClient()
+    if self.with_ordering:
+      self._pub_client = pubsub.PublisherClient(
+          publisher_options=pubsub.types.PublisherOptions(
+              enable_message_ordering=True,
+          ))
+    else:
+      self._pub_client = pubsub.PublisherClient()
     self._topic = self._pub_client.topic_path(
         self.project, self.short_topic_name)
 
@@ -647,8 +672,6 @@ class _PubSubWriteDoFn(DoFn):
     if not self._buffer:
       return
 
-    import time
-
     # The elements in buffer are serialized protobuf bytes from the previous
     # transforms. We need to deserialize them to extract data and attributes.
     futures = []
@@ -656,12 +679,13 @@ class _PubSubWriteDoFn(DoFn):
       # Deserialize the protobuf to get the original PubsubMessage
       pubsub_msg = PubsubMessage._from_proto_str(elem)
 
-      # Publish with the correct data and attributes
+      # Publish with the correct data, attributes, and ordering_key
+      kwargs = {}
       if self.with_attributes and pubsub_msg.attributes:
-        future = self._pub_client.publish(
-            self._topic, pubsub_msg.data, **pubsub_msg.attributes)
-      else:
-        future = self._pub_client.publish(self._topic, pubsub_msg.data)
+        kwargs.update(pubsub_msg.attributes)
+      if pubsub_msg.ordering_key:
+        kwargs['ordering_key'] = pubsub_msg.ordering_key
+      future = self._pub_client.publish(self._topic, pubsub_msg.data, **kwargs)
 
       futures.append(future)
 
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 8387fe734fc..e67c5f2a370 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -305,6 +305,96 @@ class PubSubIntegrationTest(unittest.TestCase):
     """Test WriteToPubSub in batch mode with attributes."""
     self._test_batch_write(with_attributes=True)
 
+  @pytest.mark.it_postcommit
+  def test_batch_write_with_ordering_key(self):
+    """Test WriteToPubSub in batch mode with ordering keys.
+
+    Dataflow's Native Pub/Sub Sink does not support ordering_key
+    (see https://github.com/apache/beam/issues/36201), so this test
+    only applies to runners using Beam's Python WriteToPubSub Sink.
+    Dataflow users should use the XLang WriteToPubSub path instead
+    (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
+    publish_with_ordering_key=True).
+    """
+    if self.runner_name == 'TestDataflowRunner':
+      self.skipTest(
+          'Dataflow Native PubSub Sink does not support ordering_key '
+          '(see https://github.com/apache/beam/issues/36201).')
+    from google.pubsub_v1.types import Subscription
+
+    from apache_beam.options.pipeline_options import PipelineOptions
+    from apache_beam.options.pipeline_options import StandardOptions
+    from apache_beam.transforms import Create
+
+    ordering_topic = self.pub_client.create_topic(
+        name=self.pub_client.topic_path(
+            self.project, 'psit_topic_ordering' + self.uuid))
+    ordering_sub = self.sub_client.create_subscription(
+        request=Subscription(
+            name=self.sub_client.subscription_path(
+                self.project, 'psit_sub_ordering' + self.uuid),
+            topic=ordering_topic.name,
+            enable_message_ordering=True,
+        ))
+    time.sleep(10)
+
+    try:
+      test_messages = [
+          PubsubMessage(
+              b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+          PubsubMessage(
+              b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+          PubsubMessage(
+              b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
+      ]
+
+      pipeline_options = PipelineOptions()
+      pipeline_options.view_as(StandardOptions).streaming = False
+
+      with TestPipeline(options=pipeline_options) as p:
+        messages = p | 'CreateMessages' >> Create(test_messages)
+        _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+            ordering_topic.name,
+            with_attributes=True,
+            publish_with_ordering_key=True)
+
+      time.sleep(10)
+
+      # Retry pulling to handle PubSub delivery delays
+      received_messages = []
+      deadline = time.time() + 60  # wait up to 60 seconds
+      while time.time() < deadline:
+        response = self.sub_client.pull(
+            request={
+                'subscription': ordering_sub.name,
+                'max_messages': 10,
+            })
+        received_messages.extend(response.received_messages)
+        if len(received_messages) >= len(test_messages):
+          break
+        time.sleep(5)
+
+      self.assertEqual(len(received_messages), len(test_messages))
+
+      received_map = {
+          msg.message.data: msg.message
+          for msg in received_messages
+      }
+      self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1')
+      self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1')
+      self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2')
+
+      ack_ids = [msg.ack_id for msg in received_messages]
+      self.sub_client.acknowledge(
+          request={
+              'subscription': ordering_sub.name,
+              'ack_ids': ack_ids,
+          })
+    finally:
+      self.sub_client.delete_subscription(
+          request={'subscription': ordering_sub.name})
+      self.pub_client.delete_topic(request={'topic': ordering_topic.name})
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 5650e920e63..c35de62fca1 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -467,6 +467,7 @@ class TestPubSubSink(unittest.TestCase):
         DisplayDataItemMatcher('id_label', 'id'),
         DisplayDataItemMatcher('with_attributes', True),
         DisplayDataItemMatcher('timestamp_attribute', 'time'),
+        DisplayDataItemMatcher('publish_with_ordering_key', False),
     ]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
@@ -1098,6 +1099,77 @@ class TestWriteToPubSub(unittest.TestCase):
         Lineage.query(p.result.metrics(), Lineage.SINK),
         set(["pubsub:topic:fakeprj.a_topic"]))
 
+  def test_write_messages_with_ordering_key(self, mock_pubsub):
+    """Test WriteToPubSub with ordering_key in messages."""
+    data = b'data'
+    ordering_key = 'order-123'
+    attributes = {'key': 'value'}
+    payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic',
+              with_attributes=True,
+              publish_with_ordering_key=True))
+
+    # Verify that publish was called with ordering_key
+    mock_pubsub.return_value.publish.assert_called()
+    call_args = mock_pubsub.return_value.publish.call_args
+
+    # Check that ordering_key was passed as a keyword argument
+    self.assertIn('ordering_key', call_args.kwargs)
+    self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+  def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
+    """Test WriteToPubSub with ordering_key but no attributes."""
+    data = b'data'
+    ordering_key = 'order-456'
+    payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic',
+              with_attributes=True,
+              publish_with_ordering_key=True))
+
+    # Verify that publish was called with ordering_key
+    mock_pubsub.return_value.publish.assert_called()
+    call_args = mock_pubsub.return_value.publish.call_args
+
+    # Check that ordering_key was passed
+    self.assertIn('ordering_key', call_args.kwargs)
+    self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+  def test_write_messages_without_ordering_key(self, mock_pubsub):
+    """Test WriteToPubSub without ordering_key (backward compatibility)."""
+    data = b'data'
+    attributes = {'key': 'value'}
+    payloads = [PubsubMessage(data, attributes)]  # No ordering_key
+
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+      _ = (
+          p
+          | Create(payloads)
+          | WriteToPubSub(
+              'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+    # Verify that publish was called
+    mock_pubsub.return_value.publish.assert_called()
+    call_args = mock_pubsub.return_value.publish.call_args
+    self.assertNotIn('ordering_key', call_args.kwargs)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to