This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 77936cdb4 fix(integration): add retry logic to consumer group polling 
tests (#2385)
77936cdb4 is described below

commit 77936cdb44558d2d11e00f01fa7d68dcee8e29f7
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Nov 20 21:09:19 2025 +0100

    fix(integration): add retry logic to consumer group polling tests (#2385)
    
    Add up to 5 retries with 1ms delay to handle transient race conditions
    during message polling in consumer group tests.
    This prevents spurious test failures when messages are temporarily
    unavailable during journal-to-disk transitions.
---
 ...h_multiple_clients_polling_messages_scenario.rs | 91 ++++++++++++++++------
 1 file changed, 66 insertions(+), 25 deletions(-)

diff --git 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index 44746bf32..6345061d2 100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -132,19 +132,36 @@ async fn execute_using_messages_key_key(
 async fn poll_messages(client: &IggyClient) -> u32 {
     let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
     let mut total_read_messages_count = 0;
+    const MAX_RETRIES: u32 = 5;
+    let mut total_retries = 0;
+
     for _ in 1..=PARTITIONS_COUNT * MESSAGES_COUNT {
-        let polled_messages = client
-            .poll_messages(
-                &Identifier::named(STREAM_NAME).unwrap(),
-                &Identifier::named(TOPIC_NAME).unwrap(),
-                None,
-                &consumer,
-                &PollingStrategy::next(),
-                1,
-                true,
-            )
-            .await
-            .unwrap();
+        let mut retries = 0;
+        let polled_messages = loop {
+            let polled = client
+                .poll_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    None,
+                    &consumer,
+                    &PollingStrategy::next(),
+                    1,
+                    true,
+                )
+                .await
+                .unwrap();
+
+            // Accept any result after max retries to avoid infinite loop
+            if !polled.messages.is_empty() || retries >= MAX_RETRIES || 
total_retries >= MAX_RETRIES
+            {
+                break polled;
+            }
+
+            retries += 1;
+            total_retries += 1;
+            // Small delay before retry to allow any pending operations to 
complete
+            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
+        };
 
         total_read_messages_count += polled_messages.messages.len() as u32;
     }
@@ -197,21 +214,45 @@ async fn execute_using_none_key(
 
 async fn validate_message_polling(client: &IggyClient) {
     let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    const MAX_RETRIES: u32 = 5;
+    let mut total_retries = 0;
 
     for i in 1..=MESSAGES_COUNT {
-        let polled_messages = client
-            .poll_messages(
-                &Identifier::named(STREAM_NAME).unwrap(),
-                &Identifier::named(TOPIC_NAME).unwrap(),
-                None,
-                &consumer,
-                &PollingStrategy::next(),
-                1,
-                true,
-            )
-            .await
-            .unwrap();
-        assert_eq!(polled_messages.messages.len(), 1);
+        let mut retries = 0;
+        let polled_messages = loop {
+            let polled = client
+                .poll_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    None,
+                    &consumer,
+                    &PollingStrategy::next(),
+                    1,
+                    true,
+                )
+                .await
+                .unwrap();
+
+            if !polled.messages.is_empty() || retries >= MAX_RETRIES || 
total_retries >= MAX_RETRIES
+            {
+                break polled;
+            }
+
+            retries += 1;
+            total_retries += 1;
+            // Small delay before retry to allow any pending operations to 
complete
+            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
+        };
+
+        assert_eq!(
+            polled_messages.messages.len(),
+            1,
+            "Expected 1 message at iteration {}, got {} (retries: {}, 
total_retries: {})",
+            i,
+            polled_messages.messages.len(),
+            retries,
+            total_retries
+        );
         let message = &polled_messages.messages[0];
         let offset = (i - 1) as u64;
         assert_eq!(message.header.offset, offset);

Reply via email to