tvalentyn commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r3390670517
##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -404,9 +406,13 @@ def __init__(
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the
value.
+ enable_message_ordering: If True, enables message ordering on the
Review Comment:
let's use consistent naming, sounds like we are using
`publish_with_ordering_key` in Java and XLang so we can stick with that.
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,96 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
+ @pytest.mark.it_postcommit
+ def test_batch_write_with_ordering_key(self):
+ """Test WriteToPubSub in batch mode with ordering keys.
+
+ Dataflow's Native Pub/Sub Sink does not support ordering_key
+ (see https://github.com/apache/beam/issues/36201), so this test
+ only applies to runners using Beam's Python WriteToPubSub Sink.
+ Dataflow users should use the XLang WriteToPubSub path instead
+ (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
+ publish_with_ordering_key=True).
+ """
+ if self.runner_name == 'TestDataflowRunner':
+ self.skipTest(
+ 'Dataflow Native PubSub Sink does not support ordering_key '
Review Comment:
nit: given this is a test, we can stop after the first line.
##########
sdks/python/apache_beam/io/gcp/pubsub_test.py:
##########
@@ -1098,6 +1097,71 @@ def
test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))
+ def test_write_messages_with_ordering_key(self, mock_pubsub):
+ """Test WriteToPubSub with ordering_key in messages."""
+ data = b'data'
+ ordering_key = 'order-123'
+ attributes = {'key': 'value'}
+ payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]
+
+ options = PipelineOptions([])
+ options.view_as(StandardOptions).streaming = True
+ with TestPipeline(options=options) as p:
+ _ = (
+ p
+ | Create(payloads)
+ | WriteToPubSub(
+ 'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+ # Verify that publish was called with ordering_key
+ mock_pubsub.return_value.publish.assert_called()
+ call_args = mock_pubsub.return_value.publish.call_args
+
+ # Check that ordering_key was passed as a keyword argument
+ self.assertIn('ordering_key', call_args.kwargs)
+ self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+ def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
+ """Test WriteToPubSub with ordering_key but no attributes."""
+ data = b'data'
+ ordering_key = 'order-456'
+ payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]
+
+ options = PipelineOptions([])
+ options.view_as(StandardOptions).streaming = True
+ with TestPipeline(options=options) as p:
+ _ = (
+ p
+ | Create(payloads)
+ | WriteToPubSub(
+ 'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+ # Verify that publish was called with ordering_key
+ mock_pubsub.return_value.publish.assert_called()
+ call_args = mock_pubsub.return_value.publish.call_args
+
+ # Check that ordering_key was passed
+ self.assertIn('ordering_key', call_args.kwargs)
+ self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)
+
+ def test_write_messages_without_ordering_key(self, mock_pubsub):
+ """Test WriteToPubSub without ordering_key (backward compatibility)."""
+ data = b'data'
+ attributes = {'key': 'value'}
+ payloads = [PubsubMessage(data, attributes)] # No ordering_key
+
+ options = PipelineOptions([])
+ options.view_as(StandardOptions).streaming = True
+ with TestPipeline(options=options) as p:
+ _ = (
+ p
+ | Create(payloads)
+ | WriteToPubSub(
+ 'projects/fakeprj/topics/a_topic', with_attributes=True))
+
+ # Verify that publish was called
+ mock_pubsub.return_value.publish.assert_called()
Review Comment:
+1
##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -430,7 +436,16 @@ def bytes_to_proto_str(element: Union[bytes, str]) ->
bytes:
def expand(self, pcoll):
# Store pipeline options for use in DoFn
self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None
-
+ # Warn Dataflow users to use the XLang path for ordering key support,
+ # since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation.
+ runner = self.pipeline_options.get_all_options().get(
+ 'runner', '') if self.pipeline_options else ''
+ if 'Dataflow' in str(runner):
Review Comment:
should we print this warning only if message ordering is enabled?
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,96 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
+ @pytest.mark.it_postcommit
+ def test_batch_write_with_ordering_key(self):
+ """Test WriteToPubSub in batch mode with ordering keys.
+
+ Dataflow's Native Pub/Sub Sink does not support ordering_key
+ (see https://github.com/apache/beam/issues/36201), so this test
+ only applies to runners using Beam's Python WriteToPubSub Sink.
+ Dataflow users should use the XLang WriteToPubSub path instead
+ (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
+ publish_with_ordering_key=True).
+ """
+ if self.runner_name == 'TestDataflowRunner':
+ self.skipTest(
+ 'Dataflow Native PubSub Sink does not support ordering_key '
+ '(see https://github.com/apache/beam/issues/36201). '
+ 'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub '
+ 'with publish_with_ordering_key=True instead.')
+ from google.pubsub_v1.types import Subscription
+
+ from apache_beam.options.pipeline_options import PipelineOptions
+ from apache_beam.options.pipeline_options import StandardOptions
+ from apache_beam.transforms import Create
+
+ ordering_topic = self.pub_client.create_topic(
+ name=self.pub_client.topic_path(
+ self.project, 'psit_topic_ordering' + self.uuid))
+ ordering_sub = self.sub_client.create_subscription(
+ request=Subscription(
+ name=self.sub_client.subscription_path(
+ self.project, 'psit_sub_ordering' + self.uuid),
+ topic=ordering_topic.name,
+ enable_message_ordering=True,
+ ))
+ time.sleep(10)
+
+ try:
+ test_messages = [
+ PubsubMessage(
+ b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
+ ]
+
+ pipeline_options = PipelineOptions()
+ pipeline_options.view_as(StandardOptions).streaming = False
+
+ with TestPipeline(options=pipeline_options) as p:
+ messages = p | 'CreateMessages' >> Create(test_messages)
+ _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+ ordering_topic.name, with_attributes=True)
Review Comment:
We need to enable ordering here, right? Does the test pass without it? If
so, how? Maybe the test isn't running?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]