tvalentyn commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r3415677614
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,96 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
+ @pytest.mark.it_postcommit
+ def test_batch_write_with_ordering_key(self):
+ """Test WriteToPubSub in batch mode with ordering keys.
+
+ Dataflow's Native Pub/Sub Sink does not support ordering_key
+ (see https://github.com/apache/beam/issues/36201), so this test
+ only applies to runners using Beam's Python WriteToPubSub Sink.
+ Dataflow users should use the XLang WriteToPubSub path instead
+ (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
+ publish_with_ordering_key=True).
+ """
+ if self.runner_name == 'TestDataflowRunner':
+ self.skipTest(
+ 'Dataflow Native PubSub Sink does not support ordering_key '
+ '(see https://github.com/apache/beam/issues/36201). '
+ 'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub '
+ 'with publish_with_ordering_key=True instead.')
+ from google.pubsub_v1.types import Subscription
+
+ from apache_beam.options.pipeline_options import PipelineOptions
+ from apache_beam.options.pipeline_options import StandardOptions
+ from apache_beam.transforms import Create
+
+ ordering_topic = self.pub_client.create_topic(
+ name=self.pub_client.topic_path(
+ self.project, 'psit_topic_ordering' + self.uuid))
+ ordering_sub = self.sub_client.create_subscription(
+ request=Subscription(
+ name=self.sub_client.subscription_path(
+ self.project, 'psit_sub_ordering' + self.uuid),
+ topic=ordering_topic.name,
+ enable_message_ordering=True,
+ ))
+ time.sleep(10)
+
+ try:
+ test_messages = [
+ PubsubMessage(
+ b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
+ PubsubMessage(
+ b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
+ ]
+
+ pipeline_options = PipelineOptions()
+ pipeline_options.view_as(StandardOptions).streaming = False
+
+ with TestPipeline(options=pipeline_options) as p:
+ messages = p | 'CreateMessages' >> Create(test_messages)
+ _ = messages | 'WriteToPubSub' >> WriteToPubSub(
+ ordering_topic.name, with_attributes=True)
Review Comment:
thanks, do you have thoughts on why the test didn't catch this omission
before?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]