shunping commented on code in PR #39134:
URL: https://github.com/apache/beam/pull/39134#discussion_r3492391828


##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -362,14 +362,22 @@ def test_batch_write_with_ordering_key(self):
 
       # Retry pulling to handle PubSub delivery delays
       received_messages = []
+      received_message_ids = set()
+      ack_ids = []
       deadline = time.time() + 60  # wait up to 60 seconds
       while time.time() < deadline:
         response = self.sub_client.pull(
             request={
                 'subscription': ordering_sub.name,
                 'max_messages': 10,
             })
-        received_messages.extend(response.received_messages)
+        for msg in response.received_messages:
+          ack_ids.append(msg.ack_id)
+          # Pub/Sub guarantees at-least-once delivery, so we must deduplicate
+          # messages by message_id to handle potential duplicate deliveries.
+          if msg.message.message_id not in received_message_ids:
+            received_message_ids.add(msg.message.message_id)
+            received_messages.append(msg)
         if len(received_messages) >= len(test_messages):
           break
         time.sleep(5)

Review Comment:
   Even if the message are redelivered, they will be dedup'ed in the loop. 
Therefore, it shouldn't cause test flakiness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to