gemini-code-assist[bot] commented on code in PR #39134:
URL: https://github.com/apache/beam/pull/39134#discussion_r3485803338
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -384,12 +392,12 @@ def test_batch_write_with_ordering_key(self):
self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1')
self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2')
- ack_ids = [msg.ack_id for msg in received_messages]
- self.sub_client.acknowledge(
- request={
- 'subscription': ordering_sub.name,
- 'ack_ids': ack_ids,
- })
+ if ack_ids:
+ self.sub_client.acknowledge(
+ request={
+ 'subscription': ordering_sub.name,
+ 'ack_ids': ack_ids,
+ })
Review Comment:

Since messages are now acknowledged immediately inside the pull loop, this
post-loop acknowledgment block is redundant and can be safely removed.
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -362,14 +362,22 @@ def test_batch_write_with_ordering_key(self):
# Retry pulling to handle PubSub delivery delays
received_messages = []
+ received_message_ids = set()
+ ack_ids = []
deadline = time.time() + 60 # wait up to 60 seconds
while time.time() < deadline:
response = self.sub_client.pull(
request={
'subscription': ordering_sub.name,
'max_messages': 10,
})
- received_messages.extend(response.received_messages)
+ for msg in response.received_messages:
+ ack_ids.append(msg.ack_id)
+ # Pub/Sub guarantees at-least-once delivery, so we must deduplicate
+ # messages by message_id to handle potential duplicate deliveries.
+ if msg.message.message_id not in received_message_ids:
+ received_message_ids.add(msg.message.message_id)
+ received_messages.append(msg)
if len(received_messages) >= len(test_messages):
break
time.sleep(5)
Review Comment:

Acknowledging received messages immediately inside the pull loop is highly
recommended. Since the retry loop can run for up to 60 seconds, which exceeds
the default Pub/Sub acknowledgment deadline of 10 seconds, any unacknowledged
messages will be redelivered by Pub/Sub. This leads to unnecessary duplicate
deliveries and potential test flakiness.
Acknowledging immediately inside the loop avoids these redeliveries and
simplifies the code by removing the need to accumulate `ack_ids` and perform a
bulk acknowledgment at the end.
```python
deadline = time.time() + 60 # wait up to 60 seconds
while time.time() < deadline:
response = self.sub_client.pull(
request={
'subscription': ordering_sub.name,
'max_messages': 10,
})
if response.received_messages:
self.sub_client.acknowledge(
request={
'subscription': ordering_sub.name,
'ack_ids': [msg.ack_id for msg in
response.received_messages],
})
for msg in response.received_messages:
# Pub/Sub guarantees at-least-once delivery, so we must
deduplicate
# messages by message_id to handle potential duplicate
deliveries.
if msg.message.message_id not in received_message_ids:
received_message_ids.add(msg.message.message_id)
received_messages.append(msg)
if len(received_messages) >= len(test_messages):
break
time.sleep(5)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]