This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch global-metadata-leftright
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 6588e1af68bb9d25f56aa5ec1766cfe81275642d
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 23 10:58:13 2026 +0100

    refactor concurrent scenario, check messages order
---
 .../tests/server/concurrent_addition.rs            |  15 +-
 .../tests/server/scenarios/concurrent_scenario.rs  | 560 ++++++++++++---------
 ...h_multiple_clients_polling_messages_scenario.rs |  60 ++-
 ...with_single_client_polling_messages_scenario.rs |  39 +-
 core/server/src/metadata/writer.rs                 |  12 +-
 5 files changed, 434 insertions(+), 252 deletions(-)

diff --git a/core/integration/tests/server/concurrent_addition.rs 
b/core/integration/tests/server/concurrent_addition.rs
index c63681340..a1618f6b8 100644
--- a/core/integration/tests/server/concurrent_addition.rs
+++ b/core/integration/tests/server/concurrent_addition.rs
@@ -30,16 +30,17 @@ use test_case::test_matrix;
 // Test matrix for race condition scenarios
 // Tests all combinations of:
 // - Transport: TCP, HTTP, QUIC, WebSocket (4)
-// - Resource: User, Stream, Topic (3)
+// - Resource: User, Stream, Topic, Partition, ConsumerGroup (5)
 // - Path: Hot (unique names), Cold (duplicate names) (2)
 // - Barrier: On (synchronized), Off (unsynchronized) (2)
-// Total: 4 × 3 × 2 × 2 = 48 test cases
+// Total: 4 × 5 × 2 × 2 = 80 test cases
+// Note: Partition + Cold is skipped (partitions don't have names)
 
 // TODO: Websocket fails for the `cold` type, cold means that we are creating 
resources with the same name.
 // It fails with the error assertion, instead of `AlreadyExist`, we get 
generic `Error`.
 #[test_matrix(
     [tcp(), http(), quic(), websocket()],
-    [user(), stream(), topic()],
+    [user(), stream(), topic(), partition(), consumer_group()],
     [hot(), cold()],
     [barrier_on(), barrier_off()]
 )]
@@ -119,6 +120,14 @@ fn topic() -> ResourceType {
     ResourceType::Topic
 }
 
+fn partition() -> ResourceType {
+    ResourceType::Partition
+}
+
+fn consumer_group() -> ResourceType {
+    ResourceType::ConsumerGroup
+}
+
 fn hot() -> ScenarioType {
     ScenarioType::Hot
 }
diff --git a/core/integration/tests/server/scenarios/concurrent_scenario.rs 
b/core/integration/tests/server/scenarios/concurrent_scenario.rs
index 5140b42b6..d42296ba2 100644
--- a/core/integration/tests/server/scenarios/concurrent_scenario.rs
+++ b/core/integration/tests/server/scenarios/concurrent_scenario.rs
@@ -19,19 +19,16 @@
 use crate::server::scenarios::create_client;
 use futures::future::join_all;
 use iggy::prelude::*;
-use iggy_common::UserInfo;
+use iggy_common::{ConsumerGroup, UserInfo};
 use integration::test_server::{ClientFactory, login_root};
 use std::sync::Arc;
 use tokio::sync::Barrier;
 
-const OPERATIONS_COUNT: usize = 40;
-const MULTIPLE_CLIENT_COUNT: usize = 10;
-const OPERATIONS_PER_CLIENT: usize = OPERATIONS_COUNT / MULTIPLE_CLIENT_COUNT;
+const CONCURRENT_CLIENTS: usize = 20;
 const USER_PASSWORD: &str = "secret";
 const TEST_STREAM_NAME: &str = "race-test-stream";
 const TEST_TOPIC_NAME: &str = "race-test-topic";
 const PARTITIONS_COUNT: u32 = 1;
-const PARTITIONS_TO_ADD: u32 = 1;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum ResourceType {
@@ -39,7 +36,7 @@ pub enum ResourceType {
     Stream,
     Topic,
     Partition,
-    // TODO(hubcio): add ConsumerGroup
+    ConsumerGroup,
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -72,8 +69,8 @@ pub async fn run(
         root_client.create_stream(TEST_STREAM_NAME).await.unwrap();
     }
 
-    // For partition tests, create parent stream and topic first
-    if resource_type == ResourceType::Partition {
+    // For partition and consumer group tests, create parent stream and topic 
first
+    if resource_type == ResourceType::Partition || resource_type == 
ResourceType::ConsumerGroup {
         root_client.create_stream(TEST_STREAM_NAME).await.unwrap();
         let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
         root_client
@@ -91,29 +88,35 @@ pub async fn run(
     }
 
     let results = match (resource_type, scenario_type) {
-        (ResourceType::User, ScenarioType::Cold) => {
-            execute_multiple_clients_users_cold(client_factory, 
use_barrier).await
-        }
         (ResourceType::User, ScenarioType::Hot) => {
-            execute_multiple_clients_users_hot(client_factory, 
use_barrier).await
+            execute_users_hot(client_factory, use_barrier).await
+        }
+        (ResourceType::User, ScenarioType::Cold) => {
+            execute_users_cold(client_factory, use_barrier).await
         }
         (ResourceType::Stream, ScenarioType::Hot) => {
-            execute_multiple_clients_streams_hot(client_factory, 
use_barrier).await
+            execute_streams_hot(client_factory, use_barrier).await
         }
         (ResourceType::Stream, ScenarioType::Cold) => {
-            execute_multiple_clients_streams_cold(client_factory, 
use_barrier).await
+            execute_streams_cold(client_factory, use_barrier).await
         }
         (ResourceType::Topic, ScenarioType::Hot) => {
-            execute_multiple_clients_topics_hot(client_factory, 
use_barrier).await
+            execute_topics_hot(client_factory, use_barrier).await
         }
         (ResourceType::Topic, ScenarioType::Cold) => {
-            execute_multiple_clients_topics_cold(client_factory, 
use_barrier).await
+            execute_topics_cold(client_factory, use_barrier).await
         }
         (ResourceType::Partition, ScenarioType::Hot) => {
-            execute_multiple_clients_partitions_hot(client_factory, 
use_barrier).await
+            execute_partitions_hot(client_factory, use_barrier).await
         }
         // Partitions don't have names, so Cold scenario doesn't apply
         (ResourceType::Partition, ScenarioType::Cold) => vec![],
+        (ResourceType::ConsumerGroup, ScenarioType::Hot) => {
+            execute_consumer_groups_hot(client_factory, use_barrier).await
+        }
+        (ResourceType::ConsumerGroup, ScenarioType::Cold) => {
+            execute_consumer_groups_cold(client_factory, use_barrier).await
+        }
     };
 
     if !results.is_empty() {
@@ -123,320 +126,299 @@ pub async fn run(
     }
 }
 
-async fn execute_multiple_clients_users_hot(
+async fn execute_users_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for client_id in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
-            for i in 0..OPERATIONS_PER_CLIENT {
-                let username = format!("race-user-{}-{}", client_id, i);
-                let result = client
-                    .create_user(&username, USER_PASSWORD, UserStatus::Active, 
None)
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            let username = format!("race-user-{}", client_id);
+            client
+                .create_user(&username, USER_PASSWORD, UserStatus::Active, 
None)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_users_cold(
+async fn execute_users_cold(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
     const DUPLICATE_USER: &str = "race-user-duplicate";
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = client
-                    .create_user(DUPLICATE_USER, USER_PASSWORD, 
UserStatus::Active, None)
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            client
+                .create_user(DUPLICATE_USER, USER_PASSWORD, 
UserStatus::Active, None)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_streams_hot(
+async fn execute_streams_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for client_id in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
-            for i in 0..OPERATIONS_PER_CLIENT {
-                let stream_name = format!("race-stream-{}-{}", client_id, i);
-                let result = client.create_stream(&stream_name).await.map(|_| 
());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            let stream_name = format!("race-stream-{}", client_id);
+            client.create_stream(&stream_name).await.map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_streams_cold(
+async fn execute_streams_cold(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
     const DUPLICATE_STREAM: &str = "race-stream-duplicate";
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = 
client.create_stream(DUPLICATE_STREAM).await.map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            client.create_stream(DUPLICATE_STREAM).await.map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_topics_hot(
+async fn execute_topics_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for client_id in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
             let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
-
-            for i in 0..OPERATIONS_PER_CLIENT {
-                let topic_name = format!("race-topic-{}-{}", client_id, i);
-                let result = client
-                    .create_topic(
-                        &stream_id,
-                        &topic_name,
-                        PARTITIONS_COUNT,
-                        CompressionAlgorithm::default(),
-                        None,
-                        IggyExpiry::NeverExpire,
-                        MaxTopicSize::ServerDefault,
-                    )
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            let topic_name = format!("race-topic-{}", client_id);
+            client
+                .create_topic(
+                    &stream_id,
+                    &topic_name,
+                    PARTITIONS_COUNT,
+                    CompressionAlgorithm::default(),
+                    None,
+                    IggyExpiry::NeverExpire,
+                    MaxTopicSize::ServerDefault,
+                )
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_topics_cold(
+async fn execute_topics_cold(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
     const DUPLICATE_TOPIC: &str = "race-topic-duplicate";
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
             let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+            client
+                .create_topic(
+                    &stream_id,
+                    DUPLICATE_TOPIC,
+                    PARTITIONS_COUNT,
+                    CompressionAlgorithm::default(),
+                    None,
+                    IggyExpiry::NeverExpire,
+                    MaxTopicSize::ServerDefault,
+                )
+                .await
+                .map(|_| ())
+        }));
+    }
 
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = client
-                    .create_topic(
-                        &stream_id,
-                        DUPLICATE_TOPIC,
-                        PARTITIONS_COUNT,
-                        CompressionAlgorithm::default(),
-                        None,
-                        IggyExpiry::NeverExpire,
-                        MaxTopicSize::ServerDefault,
-                    )
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
+    join_all(handles)
+        .await
+        .into_iter()
+        .map(|r| r.expect("task panicked"))
+        .collect()
+}
 
-        handles.push(handle);
+async fn execute_partitions_hot(
+    client_factory: &dyn ClientFactory,
+    use_barrier: bool,
+) -> Vec<OperationResult> {
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
+
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
+        let client = create_client(client_factory).await;
+        login_root(&client).await;
+        let barrier = barrier.clone();
+
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
+                b.wait().await;
+            }
+            let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+            let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+            client
+                .create_partitions(&stream_id, &topic_id, 1)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_partitions_hot(
+async fn execute_consumer_groups_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
             let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
             let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+            let group_name = format!("race-consumer-group-{}", client_id);
+            client
+                .create_consumer_group(&stream_id, &topic_id, &group_name)
+                .await
+                .map(|_| ())
+        }));
+    }
 
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = client
-                    .create_partitions(&stream_id, &topic_id, 
PARTITIONS_TO_ADD)
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
+    join_all(handles)
+        .await
+        .into_iter()
+        .map(|r| r.expect("task panicked"))
+        .collect()
+}
+
+async fn execute_consumer_groups_cold(
+    client_factory: &dyn ClientFactory,
+    use_barrier: bool,
+) -> Vec<OperationResult> {
+    const DUPLICATE_CONSUMER_GROUP: &str = "race-consumer-group-duplicate";
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-        handles.push(handle);
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
+        let client = create_client(client_factory).await;
+        login_root(&client).await;
+        let barrier = barrier.clone();
+
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
+                b.wait().await;
+            }
+            let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+            let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+            client
+                .create_consumer_group(&stream_id, &topic_id, 
DUPLICATE_CONSUMER_GROUP)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
@@ -449,9 +431,9 @@ fn validate_results(results: &[OperationResult], 
scenario_type: ScenarioType) {
             // Hot path: all operations should succeed
             assert_eq!(
                 success_count,
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected all {} operations to succeed, but only {} 
succeeded. Errors: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 success_count,
                 results.iter().filter(|r| r.is_err()).collect::<Vec<_>>()
             );
@@ -470,9 +452,9 @@ fn validate_results(results: &[OperationResult], 
scenario_type: ScenarioType) {
             );
             assert_eq!(
                 error_count,
-                OPERATIONS_COUNT - 1,
+                CONCURRENT_CLIENTS - 1,
                 "Cold path: Expected {} errors, but got {}",
-                OPERATIONS_COUNT - 1,
+                CONCURRENT_CLIENTS - 1,
                 error_count
             );
 
@@ -485,6 +467,7 @@ fn validate_results(results: &[OperationResult], 
scenario_type: ScenarioType) {
                         IggyError::UserAlreadyExists
                             | IggyError::StreamNameAlreadyExists(_)
                             | IggyError::TopicNameAlreadyExists(_, _)
+                            | IggyError::ConsumerGroupNameAlreadyExists(_, _)
                             | IggyError::HttpResponseError(400, _)
                     ),
                     "Expected 'already exists' error, got: {:?}",
@@ -624,6 +607,36 @@ async fn validate_server_state(
                 );
             }
         }
+        ResourceType::ConsumerGroup => {
+            let mut group_states = Vec::new();
+            for client in &clients {
+                let state = validate_consumer_groups_state(client, 
scenario_type).await;
+                group_states.push(state);
+            }
+
+            let first_state = &group_states[0];
+            for (i, state) in group_states.iter().enumerate().skip(1) {
+                assert_eq!(
+                    state.len(),
+                    first_state.len(),
+                    "Client {} sees different number of consumer groups ({}) 
than client 0 ({})",
+                    i,
+                    state.len(),
+                    first_state.len()
+                );
+
+                let mut first_names: Vec<_> = first_state.iter().map(|g| 
&g.name).collect();
+                let mut current_names: Vec<_> = state.iter().map(|g| 
&g.name).collect();
+                first_names.sort();
+                current_names.sort();
+
+                assert_eq!(
+                    current_names, first_names,
+                    "Client {} sees different consumer groups than client 0",
+                    i
+                );
+            }
+        }
     }
 }
 
@@ -636,12 +649,12 @@ async fn validate_users_state(client: &IggyClient, 
scenario_type: ScenarioType)
 
     match scenario_type {
         ScenarioType::Hot => {
-            // Hot path: should have OPERATIONS_COUNT unique users
+            // Hot path: should have CONCURRENT_CLIENTS unique users
             assert_eq!(
                 test_users.len(),
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected {} users, but found {}. Users: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 test_users.len(),
                 test_users.iter().map(|u| &u.username).collect::<Vec<_>>()
             );
@@ -697,12 +710,12 @@ async fn validate_streams_state(client: &IggyClient, 
scenario_type: ScenarioType
 
     match scenario_type {
         ScenarioType::Hot => {
-            // Hot path: should have OPERATIONS_COUNT unique streams
+            // Hot path: should have CONCURRENT_CLIENTS unique streams
             assert_eq!(
                 test_streams.len(),
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected {} streams, but found {}. Streams: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 test_streams.len(),
                 test_streams.iter().map(|s| &s.name).collect::<Vec<_>>()
             );
@@ -765,12 +778,12 @@ async fn validate_topics_state(client: &IggyClient, 
scenario_type: ScenarioType)
 
     match scenario_type {
         ScenarioType::Hot => {
-            // Hot path: should have OPERATIONS_COUNT unique topics
+            // Hot path: should have CONCURRENT_CLIENTS unique topics
             assert_eq!(
                 test_topics.len(),
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected {} topics, but found {}. Topics: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 test_topics.len(),
                 test_topics.iter().map(|t| &t.name).collect::<Vec<_>>()
             );
@@ -826,17 +839,82 @@ async fn validate_partitions_state(client: &IggyClient) 
-> u32 {
         .expect("Failed to get test topic")
         .expect("Test topic not found");
 
-    // Expected: initial partition + OPERATIONS_COUNT added partitions
-    let expected_partitions = PARTITIONS_COUNT + (OPERATIONS_COUNT as u32 * 
PARTITIONS_TO_ADD);
+    // Expected: initial partition + CONCURRENT_CLIENTS added partitions (1 
each)
+    let expected_partitions = PARTITIONS_COUNT + CONCURRENT_CLIENTS as u32;
     assert_eq!(
         topic.partitions_count, expected_partitions,
-        "Hot path: Expected {} partitions (1 initial + {} added), but found 
{}",
-        expected_partitions, OPERATIONS_COUNT, topic.partitions_count
+        "Expected {} partitions (1 initial + {} added), but found {}",
+        expected_partitions, CONCURRENT_CLIENTS, topic.partitions_count
     );
 
     topic.partitions_count
 }
 
+async fn validate_consumer_groups_state(
+    client: &IggyClient,
+    scenario_type: ScenarioType,
+) -> Vec<ConsumerGroup> {
+    let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+    let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+    let groups = client
+        .get_consumer_groups(&stream_id, &topic_id)
+        .await
+        .expect("Failed to get consumer groups");
+    let test_groups: Vec<_> = groups
+        .into_iter()
+        .filter(|g| g.name.starts_with("race-consumer-group-"))
+        .collect();
+
+    match scenario_type {
+        ScenarioType::Hot => {
+            assert_eq!(
+                test_groups.len(),
+                CONCURRENT_CLIENTS,
+                "Hot path: Expected {} consumer groups, but found {}. Groups: 
{:?}",
+                CONCURRENT_CLIENTS,
+                test_groups.len(),
+                test_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
+            );
+
+            let mut ids: Vec<u32> = test_groups.iter().map(|g| g.id).collect();
+            ids.sort_unstable();
+            let unique_ids: std::collections::HashSet<u32> = 
ids.iter().copied().collect();
+            assert_eq!(
+                unique_ids.len(),
+                test_groups.len(),
+                "Hot path: Found duplicate consumer group IDs: {:?}",
+                ids
+            );
+
+            let names: std::collections::HashSet<&str> =
+                test_groups.iter().map(|g| g.name.as_str()).collect();
+            assert_eq!(
+                names.len(),
+                test_groups.len(),
+                "Hot path: Found duplicate consumer group names"
+            );
+        }
+        ScenarioType::Cold => {
+            assert_eq!(
+                test_groups.len(),
+                1,
+                "Cold path: Expected exactly 1 consumer group, but found {}. 
Groups: {:?}",
+                test_groups.len(),
+                test_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
+            );
+
+            let group = &test_groups[0];
+            assert_eq!(
+                group.name, "race-consumer-group-duplicate",
+                "Cold path: Expected consumer group named 
'race-consumer-group-duplicate', found '{}'",
+                group.name
+            );
+        }
+    }
+
+    test_groups
+}
+
 async fn cleanup_resources(client: &IggyClient, resource_type: ResourceType) {
     match resource_type {
         ResourceType::User => {
@@ -852,13 +930,15 @@ async fn cleanup_resources(client: &IggyClient, 
resource_type: ResourceType) {
         ResourceType::Stream => {
             let streams = client.get_streams().await.unwrap();
             for stream in streams {
-                let _ = client
-                    .delete_stream(&Identifier::numeric(stream.id).unwrap())
-                    .await;
+                if stream.name.starts_with("race-stream-") {
+                    let _ = client
+                        
.delete_stream(&Identifier::numeric(stream.id).unwrap())
+                        .await;
+                }
             }
         }
-        // Just delete test stream (also cleans up topics and partitions)
-        ResourceType::Topic | ResourceType::Partition => {
+        // Just delete test stream (also cleans up topics, partitions, and 
consumer groups)
+        ResourceType::Topic | ResourceType::Partition | 
ResourceType::ConsumerGroup => {
             let _ = client
                 .delete_stream(&Identifier::named(TEST_STREAM_NAME).unwrap())
                 .await;
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index 6345061d2..076ecae25 100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -24,7 +24,7 @@ use iggy::prelude::*;
 use integration::test_server::{
     ClientFactory, assert_clean_system, create_user, login_root, login_user,
 };
-use std::str::FromStr;
+use std::str::{FromStr, from_utf8};
 
 pub async fn run(client_factory: &dyn ClientFactory) {
     let system_client = create_client(client_factory).await;
@@ -132,6 +132,7 @@ async fn execute_using_messages_key_key(
 async fn poll_messages(client: &IggyClient) -> u32 {
     let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
     let mut total_read_messages_count = 0;
+    let mut last_entity_id: Option<u32> = None;
     const MAX_RETRIES: u32 = 5;
     let mut total_retries = 0;
 
@@ -163,12 +164,42 @@ async fn poll_messages(client: &IggyClient) -> u32 {
             tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
         };
 
-        total_read_messages_count += polled_messages.messages.len() as u32;
+        if polled_messages.messages.is_empty() {
+            if total_retries >= MAX_RETRIES {
+                break;
+            }
+            continue;
+        }
+
+        let message = &polled_messages.messages[0];
+        let payload = from_utf8(&message.payload).unwrap();
+        let entity_id = parse_entity_id_from_payload(payload);
+
+        // Each client polls from a single partition; messages should arrive 
in send order
+        if let Some(last) = last_entity_id {
+            assert!(
+                entity_id > last,
+                "Messages out of order: {} should be > {}",
+                entity_id,
+                last
+            );
+        }
+        last_entity_id = Some(entity_id);
+
+        total_read_messages_count += 1;
     }
 
     total_read_messages_count
 }
 
+fn parse_entity_id_from_payload(payload: &str) -> u32 {
+    payload
+        .strip_prefix("message-")
+        .expect("payload should start with 'message-'")
+        .parse()
+        .expect("entity_id should be a valid u32")
+}
+
 fn create_message_payload(entity_id: u32) -> String {
     format!("message-{entity_id}")
 }
@@ -214,6 +245,7 @@ async fn execute_using_none_key(
 
 async fn validate_message_polling(client: &IggyClient) {
     let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut partition_id: Option<u32> = None;
     const MAX_RETRIES: u32 = 5;
     let mut total_retries = 0;
 
@@ -256,6 +288,23 @@ async fn validate_message_polling(client: &IggyClient) {
         let message = &polled_messages.messages[0];
         let offset = (i - 1) as u64;
         assert_eq!(message.header.offset, offset);
+
+        let payload = from_utf8(&message.payload).unwrap();
+
+        if partition_id.is_none() {
+            partition_id = 
Some(parse_partition_id_from_extended_payload(payload));
+        }
+
+        // For balanced partitioning: entity_id = partition_id + offset * 
PARTITIONS_COUNT
+        let p = partition_id.unwrap();
+        let expected_entity_id = p + (offset as u32) * PARTITIONS_COUNT;
+        let expected_payload = create_extended_message_payload(p, 
expected_entity_id);
+
+        assert_eq!(
+            payload, expected_payload,
+            "Payload mismatch at offset {}: expected '{}', got '{}'",
+            offset, expected_payload, payload
+        );
     }
 
     let polled_messages = client
@@ -273,6 +322,13 @@ async fn validate_message_polling(client: &IggyClient) {
     assert!(polled_messages.messages.is_empty())
 }
 
+fn parse_partition_id_from_extended_payload(payload: &str) -> u32 {
+    let parts: Vec<&str> = payload.split('-').collect();
+    parts[1]
+        .parse()
+        .expect("partition_id should be a valid u32")
+}
+
 fn create_extended_message_payload(partition_id: u32, entity_id: u32) -> 
String {
     format!("message-{partition_id}-{entity_id}")
 }
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
index 5eece2dcb..166267390 100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
@@ -22,6 +22,7 @@ use crate::server::scenarios::{
 };
 use iggy::prelude::*;
 use integration::test_server::{ClientFactory, assert_clean_system, login_root};
+use std::collections::HashSet;
 use std::str::{FromStr, from_utf8};
 
 pub async fn run(client_factory: &dyn ClientFactory) {
@@ -99,8 +100,11 @@ async fn execute_using_messages_key_key(client: 
&IggyClient) {
     }
 
     // 2. Poll the messages for the single client which has assigned all 
partitions in the consumer group
+    // Hash-based partitioning distributes messages unpredictably across 
partitions, and polling
+    // round-robins across partitions, so we verify completeness rather than 
order.
     let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
-    let mut total_read_messages_count = 0;
+    let mut received_entity_ids = HashSet::new();
+
     for _ in 1..=PARTITIONS_COUNT * MESSAGES_COUNT {
         let polled_messages = client
             .poll_messages(
@@ -115,10 +119,39 @@ async fn execute_using_messages_key_key(client: 
&IggyClient) {
             .await
             .unwrap();
 
-        total_read_messages_count += polled_messages.messages.len() as u32;
+        for message in &polled_messages.messages {
+            let payload = from_utf8(&message.payload).unwrap();
+            let entity_id = parse_entity_id_from_payload(payload);
+            assert!(
+                (1..=MESSAGES_COUNT).contains(&entity_id),
+                "entity_id {} out of expected range 1..={}",
+                entity_id,
+                MESSAGES_COUNT
+            );
+            received_entity_ids.insert(entity_id);
+        }
     }
 
-    assert_eq!(total_read_messages_count, MESSAGES_COUNT);
+    assert_eq!(
+        received_entity_ids.len() as u32,
+        MESSAGES_COUNT,
+        "Expected {} unique messages, got {}",
+        MESSAGES_COUNT,
+        received_entity_ids.len()
+    );
+    let expected: HashSet<u32> = (1..=MESSAGES_COUNT).collect();
+    assert_eq!(
+        received_entity_ids, expected,
+        "Missing or duplicate messages"
+    );
+}
+
+fn parse_entity_id_from_payload(payload: &str) -> u32 {
+    payload
+        .strip_prefix("message-")
+        .expect("payload should start with 'message-'")
+        .parse()
+        .expect("entity_id should be a valid u32")
 }
 
 fn create_message_payload(entity_id: u32) -> String {
diff --git a/core/server/src/metadata/writer.rs 
b/core/server/src/metadata/writer.rs
index f76d35abc..ca97842b1 100644
--- a/core/server/src/metadata/writer.rs
+++ b/core/server/src/metadata/writer.rs
@@ -66,13 +66,15 @@ impl MetadataWriter {
     }
 
     pub fn add_stream(&mut self, meta: StreamMeta) -> StreamId {
-        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
         self.append(MetadataOp::AddStream {
             meta,
             assigned_id: assigned_id.clone(),
         });
         self.publish();
-        assigned_id.load(Ordering::Acquire)
+        let id = assigned_id.load(Ordering::Acquire);
+        debug_assert_ne!(id, usize::MAX, "add_stream should always succeed");
+        id
     }
 
     pub fn update_stream(&mut self, id: StreamId, new_name: Arc<str>) {
@@ -189,13 +191,15 @@ impl MetadataWriter {
     }
 
     pub fn add_user(&mut self, meta: UserMeta) -> UserId {
-        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
         self.append(MetadataOp::AddUser {
             meta,
             assigned_id: assigned_id.clone(),
         });
         self.publish();
-        assigned_id.load(Ordering::Acquire) as UserId
+        let id = assigned_id.load(Ordering::Acquire);
+        debug_assert_ne!(id, usize::MAX, "add_user should always succeed");
+        id as UserId
     }
 
     pub fn update_user_meta(&mut self, id: UserId, meta: UserMeta) {


Reply via email to