shunping opened a new pull request, #39134: URL: https://github.com/apache/beam/pull/39134
PubSub guarantees at-least-once delivery, which can result in duplicate messages being pulled during the test. Deduplicate the received messages by message ID to prevent assertion failures on message count, while still acknowledging all received delivery attempts. Failed test example: https://github.com/apache/beam/actions/runs/28270530203/job/83766777493?pr=39130 Traceback: ``` ___________ PubSubIntegrationTest.test_batch_write_with_ordering_key ___________ [gw1] linux -- Python 3.12.13 /runner/_work/beam/beam/build/gradleenv/417525525/bin/python3 self = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest testMethod=test_batch_write_with_ordering_key> @pytest.mark.it_postcommit def test_batch_write_with_ordering_key(self): """Test WriteToPubSub in batch mode with ordering keys. Dataflow's Native Pub/Sub Sink does not support ordering_key (see https://github.com/apache/beam/issues/36201), so this test only applies to runners using Beam's Python WriteToPubSub Sink. Dataflow users should use the XLang WriteToPubSub path instead (apache_beam.io.external.gcp.pubsub.WriteToPubSub with publish_with_ordering_key=True). """ if self.runner_name == 'TestDataflowRunner': self.skipTest( 'Dataflow Native PubSub Sink does not support ordering_key ' '(see https://github.com/apache/beam/issues/36201).') from google.pubsub_v1.types import Subscription from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.transforms import Create ordering_topic = self.pub_client.create_topic( name=self.pub_client.topic_path( self.project, 'psit_topic_ordering' + self.uuid)) ordering_sub = self.sub_client.create_subscription( request=Subscription( name=self.sub_client.subscription_path( self.project, 'psit_sub_ordering' + self.uuid), topic=ordering_topic.name, enable_message_ordering=True, )) time.sleep(10) try: test_messages = [ PubsubMessage( b'order_data001', {'attr': 'value1'}, ordering_key='key1'), PubsubMessage( b'order_data002', {'attr': 'value2'}, ordering_key='key1'), PubsubMessage( b'order_data003', {'attr': 'value3'}, ordering_key='key2'), ] pipeline_options = PipelineOptions() pipeline_options.view_as(StandardOptions).streaming = False with TestPipeline(options=pipeline_options) as p: messages = p | 'CreateMessages' >> Create(test_messages) _ = messages | 'WriteToPubSub' >> WriteToPubSub( ordering_topic.name, with_attributes=True, publish_with_ordering_key=True) time.sleep(10) # Retry pulling to handle PubSub delivery delays received_messages = [] deadline = time.time() + 60 # wait up to 60 seconds while time.time() < deadline: response = self.sub_client.pull( request={ 'subscription': ordering_sub.name, 'max_messages': 10, }) received_messages.extend(response.received_messages) if len(received_messages) >= len(test_messages): break time.sleep(5) > self.assertEqual(len(received_messages), len(test_messages)) E AssertionError: 4 != 3 apache_beam/io/gcp/pubsub_integration_test.py:377: AssertionError ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
