gemini-code-assist[bot] commented on code in PR #39134:
URL: https://github.com/apache/beam/pull/39134#discussion_r3485803338


##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -384,12 +392,12 @@ def test_batch_write_with_ordering_key(self):
       self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1')
       self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2')
 
-      ack_ids = [msg.ack_id for msg in received_messages]
-      self.sub_client.acknowledge(
-          request={
-              'subscription': ordering_sub.name,
-              'ack_ids': ack_ids,
-          })
+      if ack_ids:
+        self.sub_client.acknowledge(
+            request={
+                'subscription': ordering_sub.name,
+                'ack_ids': ack_ids,
+            })

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Since messages are now acknowledged immediately inside the pull loop, this 
post-loop acknowledgment block is redundant and can be safely removed.



##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -362,14 +362,22 @@ def test_batch_write_with_ordering_key(self):
 
       # Retry pulling to handle PubSub delivery delays
       received_messages = []
+      received_message_ids = set()
+      ack_ids = []
       deadline = time.time() + 60  # wait up to 60 seconds
       while time.time() < deadline:
         response = self.sub_client.pull(
             request={
                 'subscription': ordering_sub.name,
                 'max_messages': 10,
             })
-        received_messages.extend(response.received_messages)
+        for msg in response.received_messages:
+          ack_ids.append(msg.ack_id)
+          # Pub/Sub guarantees at-least-once delivery, so we must deduplicate
+          # messages by message_id to handle potential duplicate deliveries.
+          if msg.message.message_id not in received_message_ids:
+            received_message_ids.add(msg.message.message_id)
+            received_messages.append(msg)
         if len(received_messages) >= len(test_messages):
           break
         time.sleep(5)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Acknowledging received messages immediately inside the pull loop is highly 
recommended. Since the retry loop can run for up to 60 seconds, which exceeds 
the default Pub/Sub acknowledgment deadline of 10 seconds, any unacknowledged 
messages will be redelivered by Pub/Sub. This leads to unnecessary duplicate 
deliveries and potential test flakiness.
   
   Acknowledging immediately inside the loop avoids these redeliveries and 
simplifies the code by removing the need to accumulate `ack_ids` and perform a 
bulk acknowledgment at the end.
   
   ```python
         deadline = time.time() + 60  # wait up to 60 seconds
         while time.time() < deadline:
           response = self.sub_client.pull(
               request={
                   'subscription': ordering_sub.name,
                   'max_messages': 10,
               })
           if response.received_messages:
             self.sub_client.acknowledge(
                 request={
                     'subscription': ordering_sub.name,
                     'ack_ids': [msg.ack_id for msg in 
response.received_messages],
                 })
             for msg in response.received_messages:
               # Pub/Sub guarantees at-least-once delivery, so we must 
deduplicate
               # messages by message_id to handle potential duplicate 
deliveries.
               if msg.message.message_id not in received_message_ids:
                 received_message_ids.add(msg.message.message_id)
                 received_messages.append(msg)
           if len(received_messages) >= len(test_messages):
             break
           time.sleep(5)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to