This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0bc785253eb Fix WriteToPubSub to pass ordering_key to publish() method
(#37345)
0bc785253eb is described below
commit 0bc785253ebb7622496f134934031fd07a68c8bc
Author: Nikita Grover <[email protected]>
AuthorDate: Wed Jun 17 07:07:29 2026 +0530
Fix WriteToPubSub to pass ordering_key to publish() method (#37345)
* Fix WriteToPubSub to pass ordering_key to publish() method
Fixes #36201
* Update pubsub_test.py
* Update pubsub_test.py
* Trigger CI rerun
* Retry CI (flake)
* Apply yapf formatting
* Address review comments: use message_to_proto_str and skip ordering key
test on Dataflow
* Add Dataflow warning in WriteToPubSub.expand() for ordering key support
* Update pubsub_integration_test.py
* Update PR and modification in beam_PostCommit_Python.json
Updated PR number and modification count.
* Update pubsub_integration_test.py
* Update pubsub_integration_test.py
* Fix PubSub error
* Update pubsub_integration_test.py
* Fix formatting: remove trailing whitespace
* Fix ordering key integration test: retry pull loop, fix indentation
* Fix import order: google imports before apache_beam (isort)
* Fix import ordering in pubsub_integration_test.py
* Simplify publish kwargs, conditionally enable message ordering, add retry
loop in integration test
* Use with_attributes to initialize with_ordering and apply in
PublisherClient setup
* Fix PubSub tests: enable message ordering in PublisherClient to exercise
full ordering flow
* Rename to publish_with_ordering_key, gate Dataflow warning, fix test
assertions
---------
Co-authored-by: Nikita Grover <[email protected]>
---
.github/trigger_files/beam_PostCommit_Python.json | 4 +-
.../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 6 ++
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 5 +-
sdks/python/apache_beam/io/external/gcp/pubsub.py | 11 ++-
sdks/python/apache_beam/io/gcp/pubsub.py | 48 +++++++++---
.../apache_beam/io/gcp/pubsub_integration_test.py | 90 ++++++++++++++++++++++
sdks/python/apache_beam/io/gcp/pubsub_test.py | 72 +++++++++++++++++
7 files changed, 219 insertions(+), 17 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python.json
b/.github/trigger_files/beam_PostCommit_Python.json
index def2ac98408..11064375d62 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "pr": "38069",
+ "pr": "37345",
"modification": 49
-}
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 658d1fc29e3..23b14b68a08 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -53,6 +53,7 @@ public final class ExternalWrite implements
ExternalTransformRegistrar {
private String topic;
private @Nullable String idAttribute;
private @Nullable String timestampAttribute;
+ private boolean publishWithOrderingKey = false;
public void setTopic(String topic) {
this.topic = topic;
@@ -65,6 +66,10 @@ public final class ExternalWrite implements
ExternalTransformRegistrar {
public void setTimestampAttribute(@Nullable String timestampAttribute) {
this.timestampAttribute = timestampAttribute;
}
+
+ public void setPublishWithOrderingKey(Boolean publishWithOrderingKey) {
+ this.publishWithOrderingKey = publishWithOrderingKey != null &&
publishWithOrderingKey;
+ }
}
public static class WriteBuilder
@@ -85,6 +90,7 @@ public final class ExternalWrite implements
ExternalTransformRegistrar {
if (config.timestampAttribute != null) {
writeBuilder.setTimestampAttribute(config.timestampAttribute);
}
+ writeBuilder.setPublishWithOrderingKey(config.publishWithOrderingKey);
writeBuilder.setDynamicDestinations(false);
return writeBuilder.build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index d62d294ed2a..57005745044 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -1727,7 +1727,10 @@ public class PubsubIO {
this.pubsubClient =
getPubsubClientFactory()
.newClient(
- getTimestampAttribute(), null,
c.getPipelineOptions().as(PubsubOptions.class));
+ getTimestampAttribute(),
+ null,
+ c.getPipelineOptions().as(PubsubOptions.class),
+ Write.this.getPubsubRootUrl());
}
@ProcessElement
diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py
b/sdks/python/apache_beam/io/external/gcp/pubsub.py
index a2a3430f9a1..3125c042227 100644
--- a/sdks/python/apache_beam/io/external/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py
@@ -117,6 +117,7 @@ WriteToPubsubSchema = typing.NamedTuple(
# this is not implemented yet on the Java side:
# ('with_attributes', bool),
('timestamp_attribute', typing.Optional[str]),
+ ('publish_with_ordering_key', bool),
])
@@ -135,6 +136,7 @@ class WriteToPubSub(beam.PTransform):
with_attributes=False,
id_label=None,
timestamp_attribute=None,
+ publish_with_ordering_key=False,
expansion_service=None):
"""Initializes ``WriteToPubSub``.
@@ -150,18 +152,23 @@ class WriteToPubSub(beam.PTransform):
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the
value.
+ publish_with_ordering_key: If True, enables ordering key support when
+ publishing messages. The ordering key must be set on each
+ PubsubMessage via the ``ordering_key`` attribute.
"""
self.params = WriteToPubsubSchema(
topic=topic,
id_label=id_label,
# with_attributes=with_attributes,
- timestamp_attribute=timestamp_attribute)
+ timestamp_attribute=timestamp_attribute,
+ publish_with_ordering_key=publish_with_ordering_key)
self.expansion_service = expansion_service
self.with_attributes = with_attributes
def expand(self, pvalue):
if self.with_attributes:
- pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
+ pcoll = pvalue | 'ToProto' >> Map(
+ pubsub.WriteToPubSub.message_to_proto_str)
else:
pcoll = pvalue | 'ToProto' >> Map(
lambda x: pubsub.PubsubMessage(x, {})._to_proto_str())
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 276103f5276..55856f50478 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -31,8 +31,9 @@ https://github.com/apache/beam/blob/master/sdks/python/OWNERS
"""
# pytype: skip-file
-
+import logging
import re
+import time
from typing import Any
from typing import NamedTuple
from typing import Optional
@@ -388,7 +389,8 @@ class WriteToPubSub(PTransform):
topic: str,
with_attributes: bool = False,
id_label: Optional[str] = None,
- timestamp_attribute: Optional[str] = None) -> None:
+ timestamp_attribute: Optional[str] = None,
+ publish_with_ordering_key: bool = False) -> None:
"""Initializes ``WriteToPubSub``.
Args:
@@ -404,9 +406,13 @@ class WriteToPubSub(PTransform):
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the
value.
+ publish_with_ordering_key: If True, enables message ordering on the
+ PublisherClient. Messages with an ordering_key will be delivered
+ in order. Requires messages to have ordering_key set.
"""
super().__init__()
self.with_attributes = with_attributes
+ self.publish_with_ordering_key = publish_with_ordering_key
self.id_label = id_label
self.timestamp_attribute = timestamp_attribute
self.project, self.topic_name = parse_topic(topic)
@@ -430,7 +436,16 @@ class WriteToPubSub(PTransform):
def expand(self, pcoll):
# Store pipeline options for use in DoFn
self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None
-
+ # Warn Dataflow users to use the XLang path for ordering key support,
+ # since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation.
+ runner = self.pipeline_options.get_all_options().get(
+ 'runner', '') if self.pipeline_options else ''
+ if 'Dataflow' in str(runner) and self.publish_with_ordering_key:
+ logging.warning(
+ 'WriteToPubSub ordering_key support is not available on Dataflow '
+ 'via this transform. Use the XLang WriteToPubSub path instead: '
+ 'apache_beam.io.external.gcp.pubsub.WriteToPubSub with '
+ 'publish_with_ordering_key=True.')
if self.with_attributes:
pcoll = pcoll | 'ToProtobufX' >> ParDo(
_AddMetricsAndMap(
@@ -457,6 +472,9 @@ class WriteToPubSub(PTransform):
True, label='With Attributes').drop_if_none(),
'timestamp_attribute': DisplayDataItem(
self.timestamp_attribute, label='Timestamp Attribute'),
+ 'publish_with_ordering_key': DisplayDataItem(
+ self.publish_with_ordering_key,
+ label='Publish With Ordering Key').drop_if_none(),
}
@@ -563,6 +581,7 @@ class _PubSubWriteDoFn(DoFn):
self.id_label = transform.id_label
self.timestamp_attribute = transform.timestamp_attribute
self.with_attributes = transform.with_attributes
+ self.with_ordering = transform.publish_with_ordering_key
# TODO(https://github.com/apache/beam/issues/18939): Add support for
# id_label and timestamp_attribute.
@@ -597,7 +616,7 @@ class _PubSubWriteDoFn(DoFn):
output_labels_supported = False
# Log debug information for troubleshooting
- import logging
+
runner_info = getattr(
pipeline_options, 'runner',
'None') if pipeline_options else 'No options'
@@ -628,7 +647,13 @@ class _PubSubWriteDoFn(DoFn):
def setup(self):
from google.cloud import pubsub
- self._pub_client = pubsub.PublisherClient()
+ if self.with_ordering:
+ self._pub_client = pubsub.PublisherClient(
+ publisher_options=pubsub.types.PublisherOptions(
+ enable_message_ordering=True,
+ ))
+ else:
+ self._pub_client = pubsub.PublisherClient()
self._topic = self._pub_client.topic_path(
self.project, self.short_topic_name)
@@ -647,8 +672,6 @@ class _PubSubWriteDoFn(DoFn):
if not self._buffer:
return
- import time
-
# The elements in buffer are serialized protobuf bytes from the previous
# transforms. We need to deserialize them to extract data and attributes.
futures = []
@@ -656,12 +679,13 @@ class _PubSubWriteDoFn(DoFn):
# Deserialize the protobuf to get the original PubsubMessage
pubsub_msg = PubsubMessage._from_proto_str(elem)
- # Publish with the correct data and attributes
+ # Publish with the correct data, attributes, and ordering_key
+ kwargs = {}
if self.with_attributes and pubsub_msg.attributes:
- future = self._pub_client.publish(
- self._topic, pubsub_msg.data, **pubsub_msg.attributes)
- else:
- future = self._pub_client.publish(self._topic, pubsub_msg.data)
+ kwargs.update(pubsub_msg.attributes)
+ if pubsub_msg.ordering_key:
+ kwargs['ordering_key'] = pubsub_msg.ordering_key
+ future = self._pub_client.publish(self._topic, pubsub_msg.data, **kwargs)
futures.append(future)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 8387fe734fc..e67c5f2a370 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -305,6 +305,96 @@ class PubSubIntegrationTest(unittest.TestCase):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
+ @pytest.mark.it_postcommit
+ def test_batch_write_with_ordering_key(self):
+ """Test WriteToPubSub in batch mode with ordering keys.
+
+ Dataflow's Native Pub/Sub Sink does not support ordering_key
+ (see https://github.com/apache/beam/issues/36201), so this test
+ only applies to runners using Beam's Python WriteToPubSub Sink.
+ Dataflow users should use the XLang WriteToPubSub path instead
+ (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
+ publish_with_ordering_key=True).
+ """
+ if self.runner_name == 'TestDataflowRunner':
+ self.skipTest(
+ 'Dataflow Native PubSub Sink does not support ordering_key '
+ '(see https://github.com/apache/beam/issues/36201).')
+ from google.pubsub_v1.types import Subscription
+
+ from apache_beam.options.pipeline_options import PipelineOptions
+ from apache_beam.options.pipeline_options import StandardOptions
+ from apache_beam.transforms import Create
+
+ ordering_topic = self.pub_client.create_topic(
+ name=self.pub_client.topic_path(
+ self.project, 'psit_topic_ordering' + self.uuid))
+ ordering_sub = self.sub_client.create_subscription(
+ request=Subscription(
+ name=self.sub_client.subscription_path(
+ self.project, 'psit_sub_ordering' + self.uuid),
+ topic=ordering_topic.name,
+ enable_message_ordering=True,
+ ))
+ time.sleep(10)
+
+ try:
+ test_messages = [
+ PubsubMessage(
+ b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
+ ]
+
+ pipeline_options = PipelineOptions()
+ pipeline_options.view_as(StandardOptions).streaming = False
+
+ with TestPipeline(options=pipeline_options) as p:
+ messages = p | 'CreateMessages' >> Create(test_messages)
+ _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+ ordering_topic.name,
+ with_attributes=True,
+ publish_with_ordering_key=True)
+
+ time.sleep(10)
+
+ # Retry pulling to handle PubSub delivery delays
+ received_messages = []
+ deadline = time.time() + 60 # wait up to 60 seconds
+ while time.time() < deadline:
+ response = self.sub_client.pull(
+ request={
+ 'subscription': ordering_sub.name,
+ 'max_messages': 10,
+ })
+ received_messages.extend(response.received_messages)
+ if len(received_messages) >= len(test_messages):
+ break
+ time.sleep(5)
+
+ self.assertEqual(len(received_messages), len(test_messages))
+
+ received_map = {
+ msg.message.data: msg.message
+ for msg in received_messages
+ }
+ self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1')
+ self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1')
+ self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2')
+
+ ack_ids = [msg.ack_id for msg in received_messages]
+ self.sub_client.acknowledge(
+ request={
+ 'subscription': ordering_sub.name,
+ 'ack_ids': ack_ids,
+ })
+ finally:
+ self.sub_client.delete_subscription(
+ request={'subscription': ordering_sub.name})
+ self.pub_client.delete_topic(request={'topic': ordering_topic.name})
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 5650e920e63..c35de62fca1 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -467,6 +467,7 @@ class TestPubSubSink(unittest.TestCase):
DisplayDataItemMatcher('id_label', 'id'),
DisplayDataItemMatcher('with_attributes', True),
DisplayDataItemMatcher('timestamp_attribute', 'time'),
+ DisplayDataItemMatcher('publish_with_ordering_key', False),
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
@@ -1098,6 +1099,77 @@ class TestWriteToPubSub(unittest.TestCase):
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))
+ def test_write_messages_with_ordering_key(self, mock_pubsub):
+ """Test WriteToPubSub with ordering_key in messages."""
+ data = b'data'
+ ordering_key = 'order-123'
+ attributes = {'key': 'value'}
+ payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]
+
+ options = PipelineOptions([])
+ options.view_as(StandardOptions).streaming = True
+ with TestPipeline(options=options) as p:
+ _ = (
+ p
+ | Create(payloads)
+ | WriteToPubSub(
+ 'projects/fakeprj/topics/a_topic',
+ with_attributes=True,
+ publish_with_ordering_key=True))
+
+ # Verify that publish was called with ordering_key
+ mock_pubsub.return_value.publish.assert_called()
+ call_args = mock_pubsub.return_value.publish.call_args
+
+ # Check that ordering_key was passed as a keyword argument
+ self.assertIn('ordering_key', call_args.kwargs)
+ self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+ def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
+ """Test WriteToPubSub with ordering_key but no attributes."""
+ data = b'data'
+ ordering_key = 'order-456'
+ payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]
+
+ options = PipelineOptions([])
+ options.view_as(StandardOptions).streaming = True
+ with TestPipeline(options=options) as p:
+ _ = (
+ p
+ | Create(payloads)
+ | WriteToPubSub(
+ 'projects/fakeprj/topics/a_topic',
+ with_attributes=True,
+ publish_with_ordering_key=True))
+
+ # Verify that publish was called with ordering_key
+ mock_pubsub.return_value.publish.assert_called()
+ call_args = mock_pubsub.return_value.publish.call_args
+
+ # Check that ordering_key was passed
+ self.assertIn('ordering_key', call_args.kwargs)
+ self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+ def test_write_messages_without_ordering_key(self, mock_pubsub):
+ """Test WriteToPubSub without ordering_key (backward compatibility)."""
+ data = b'data'
+ attributes = {'key': 'value'}
+ payloads = [PubsubMessage(data, attributes)] # No ordering_key
+
+ options = PipelineOptions([])
+ options.view_as(StandardOptions).streaming = True
+ with TestPipeline(options=options) as p:
+ _ = (
+ p
+ | Create(payloads)
+ | WriteToPubSub(
+ 'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+ # Verify that publish was called
+ mock_pubsub.return_value.publish.assert_called()
+ call_args = mock_pubsub.return_value.publish.call_args
+ self.assertNotIn('ordering_key', call_args.kwargs)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)