shunping opened a new pull request, #39134:
URL: https://github.com/apache/beam/pull/39134

   PubSub guarantees at-least-once delivery, which can result in duplicate 
messages being pulled during the test. Deduplicate the received messages by 
message ID to prevent assertion failures on message count, while still 
acknowledging all received delivery attempts.
   
   Failed test example:
   
https://github.com/apache/beam/actions/runs/28270530203/job/83766777493?pr=39130
   
   Traceback:
   ```
   ___________ PubSubIntegrationTest.test_batch_write_with_ordering_key 
___________
   [gw1] linux -- Python 3.12.13 
/runner/_work/beam/beam/build/gradleenv/417525525/bin/python3
   
   self = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest 
testMethod=test_batch_write_with_ordering_key>
   
       @pytest.mark.it_postcommit
       def test_batch_write_with_ordering_key(self):
         """Test WriteToPubSub in batch mode with ordering keys.
       
         Dataflow's Native Pub/Sub Sink does not support ordering_key
         (see https://github.com/apache/beam/issues/36201), so this test
         only applies to runners using Beam's Python WriteToPubSub Sink.
         Dataflow users should use the XLang WriteToPubSub path instead
         (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
         publish_with_ordering_key=True).
         """
         if self.runner_name == 'TestDataflowRunner':
           self.skipTest(
               'Dataflow Native PubSub Sink does not support ordering_key '
               '(see https://github.com/apache/beam/issues/36201).')
         from google.pubsub_v1.types import Subscription
       
         from apache_beam.options.pipeline_options import PipelineOptions
         from apache_beam.options.pipeline_options import StandardOptions
         from apache_beam.transforms import Create
       
         ordering_topic = self.pub_client.create_topic(
             name=self.pub_client.topic_path(
                 self.project, 'psit_topic_ordering' + self.uuid))
         ordering_sub = self.sub_client.create_subscription(
             request=Subscription(
                 name=self.sub_client.subscription_path(
                     self.project, 'psit_sub_ordering' + self.uuid),
                 topic=ordering_topic.name,
                 enable_message_ordering=True,
             ))
         time.sleep(10)
       
         try:
           test_messages = [
               PubsubMessage(
                   b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
               PubsubMessage(
                   b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
               PubsubMessage(
                   b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
           ]
       
           pipeline_options = PipelineOptions()
           pipeline_options.view_as(StandardOptions).streaming = False
       
           with TestPipeline(options=pipeline_options) as p:
             messages = p | 'CreateMessages' >> Create(test_messages)
             _ = messages | 'WriteToPubSub' >> WriteToPubSub(
                 ordering_topic.name,
                 with_attributes=True,
                 publish_with_ordering_key=True)
       
           time.sleep(10)
       
           # Retry pulling to handle PubSub delivery delays
           received_messages = []
           deadline = time.time() + 60  # wait up to 60 seconds
           while time.time() < deadline:
             response = self.sub_client.pull(
                 request={
                     'subscription': ordering_sub.name,
                     'max_messages': 10,
                 })
             received_messages.extend(response.received_messages)
             if len(received_messages) >= len(test_messages):
               break
             time.sleep(5)
       
   >       self.assertEqual(len(received_messages), len(test_messages))
   E       AssertionError: 4 != 3
   
   apache_beam/io/gcp/pubsub_integration_test.py:377: AssertionError
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to