This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 77936cdb4 fix(integration): add retry logic to consumer group polling
tests (#2385)
77936cdb4 is described below
commit 77936cdb44558d2d11e00f01fa7d68dcee8e29f7
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Nov 20 21:09:19 2025 +0100
fix(integration): add retry logic to consumer group polling tests (#2385)
Add up to 5 retries with 1ms delay to handle transient race conditions
during message polling in consumer group tests.
This prevents spurious test failures when messages are temporarily
unavailable during journal-to-disk transitions.
---
...h_multiple_clients_polling_messages_scenario.rs | 91 ++++++++++++++++------
1 file changed, 66 insertions(+), 25 deletions(-)
diff --git
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index 44746bf32..6345061d2 100644
---
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -132,19 +132,36 @@ async fn execute_using_messages_key_key(
async fn poll_messages(client: &IggyClient) -> u32 {
let consumer =
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
let mut total_read_messages_count = 0;
+ const MAX_RETRIES: u32 = 5;
+ let mut total_retries = 0;
+
for _ in 1..=PARTITIONS_COUNT * MESSAGES_COUNT {
- let polled_messages = client
- .poll_messages(
- &Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
- None,
- &consumer,
- &PollingStrategy::next(),
- 1,
- true,
- )
- .await
- .unwrap();
+ let mut retries = 0;
+ let polled_messages = loop {
+ let polled = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ None,
+ &consumer,
+ &PollingStrategy::next(),
+ 1,
+ true,
+ )
+ .await
+ .unwrap();
+
+ // Accept any result after max retries to avoid infinite loop
+ if !polled.messages.is_empty() || retries >= MAX_RETRIES ||
total_retries >= MAX_RETRIES
+ {
+ break polled;
+ }
+
+ retries += 1;
+ total_retries += 1;
+ // Small delay before retry to allow any pending operations to
complete
+ tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
+ };
total_read_messages_count += polled_messages.messages.len() as u32;
}
@@ -197,21 +214,45 @@ async fn execute_using_none_key(
async fn validate_message_polling(client: &IggyClient) {
let consumer =
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+ const MAX_RETRIES: u32 = 5;
+ let mut total_retries = 0;
for i in 1..=MESSAGES_COUNT {
- let polled_messages = client
- .poll_messages(
- &Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
- None,
- &consumer,
- &PollingStrategy::next(),
- 1,
- true,
- )
- .await
- .unwrap();
- assert_eq!(polled_messages.messages.len(), 1);
+ let mut retries = 0;
+ let polled_messages = loop {
+ let polled = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ None,
+ &consumer,
+ &PollingStrategy::next(),
+ 1,
+ true,
+ )
+ .await
+ .unwrap();
+
+ if !polled.messages.is_empty() || retries >= MAX_RETRIES ||
total_retries >= MAX_RETRIES
+ {
+ break polled;
+ }
+
+ retries += 1;
+ total_retries += 1;
+ // Small delay before retry to allow any pending operations to
complete
+ tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
+ };
+
+ assert_eq!(
+ polled_messages.messages.len(),
+ 1,
+ "Expected 1 message at iteration {}, got {} (retries: {},
total_retries: {})",
+ i,
+ polled_messages.messages.len(),
+ retries,
+ total_retries
+ );
let message = &polled_messages.messages[0];
let offset = (i - 1) as u64;
assert_eq!(message.header.offset, offset);