This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch global-metadata-leftright
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/global-metadata-leftright by 
this push:
     new 8a1a24ced refactor concurrent scenario
8a1a24ced is described below

commit 8a1a24ced84f97828281af9d46d60c594d5bc99f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 23 10:58:13 2026 +0100

    refactor concurrent scenario
---
 .../tests/server/concurrent_addition.rs            |  15 +-
 .../tests/server/scenarios/concurrent_scenario.rs  | 560 ++++++++++++---------
 2 files changed, 332 insertions(+), 243 deletions(-)

diff --git a/core/integration/tests/server/concurrent_addition.rs 
b/core/integration/tests/server/concurrent_addition.rs
index c63681340..a1618f6b8 100644
--- a/core/integration/tests/server/concurrent_addition.rs
+++ b/core/integration/tests/server/concurrent_addition.rs
@@ -30,16 +30,17 @@ use test_case::test_matrix;
 // Test matrix for race condition scenarios
 // Tests all combinations of:
 // - Transport: TCP, HTTP, QUIC, WebSocket (4)
-// - Resource: User, Stream, Topic (3)
+// - Resource: User, Stream, Topic, Partition, ConsumerGroup (5)
 // - Path: Hot (unique names), Cold (duplicate names) (2)
 // - Barrier: On (synchronized), Off (unsynchronized) (2)
-// Total: 4 × 3 × 2 × 2 = 48 test cases
+// Total: 4 × 5 × 2 × 2 = 80 test cases
+// Note: Partition + Cold is skipped (partitions don't have names)
 
 // TODO: Websocket fails for the `cold` type, cold means that we are creating 
resources with the same name.
 // It fails with the error assertion, instead of `AlreadyExist`, we get 
generic `Error`.
 #[test_matrix(
     [tcp(), http(), quic(), websocket()],
-    [user(), stream(), topic()],
+    [user(), stream(), topic(), partition(), consumer_group()],
     [hot(), cold()],
     [barrier_on(), barrier_off()]
 )]
@@ -119,6 +120,14 @@ fn topic() -> ResourceType {
     ResourceType::Topic
 }
 
+fn partition() -> ResourceType {
+    ResourceType::Partition
+}
+
+fn consumer_group() -> ResourceType {
+    ResourceType::ConsumerGroup
+}
+
 fn hot() -> ScenarioType {
     ScenarioType::Hot
 }
diff --git a/core/integration/tests/server/scenarios/concurrent_scenario.rs 
b/core/integration/tests/server/scenarios/concurrent_scenario.rs
index 5140b42b6..d42296ba2 100644
--- a/core/integration/tests/server/scenarios/concurrent_scenario.rs
+++ b/core/integration/tests/server/scenarios/concurrent_scenario.rs
@@ -19,19 +19,16 @@
 use crate::server::scenarios::create_client;
 use futures::future::join_all;
 use iggy::prelude::*;
-use iggy_common::UserInfo;
+use iggy_common::{ConsumerGroup, UserInfo};
 use integration::test_server::{ClientFactory, login_root};
 use std::sync::Arc;
 use tokio::sync::Barrier;
 
-const OPERATIONS_COUNT: usize = 40;
-const MULTIPLE_CLIENT_COUNT: usize = 10;
-const OPERATIONS_PER_CLIENT: usize = OPERATIONS_COUNT / MULTIPLE_CLIENT_COUNT;
+const CONCURRENT_CLIENTS: usize = 20;
 const USER_PASSWORD: &str = "secret";
 const TEST_STREAM_NAME: &str = "race-test-stream";
 const TEST_TOPIC_NAME: &str = "race-test-topic";
 const PARTITIONS_COUNT: u32 = 1;
-const PARTITIONS_TO_ADD: u32 = 1;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum ResourceType {
@@ -39,7 +36,7 @@ pub enum ResourceType {
     Stream,
     Topic,
     Partition,
-    // TODO(hubcio): add ConsumerGroup
+    ConsumerGroup,
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -72,8 +69,8 @@ pub async fn run(
         root_client.create_stream(TEST_STREAM_NAME).await.unwrap();
     }
 
-    // For partition tests, create parent stream and topic first
-    if resource_type == ResourceType::Partition {
+    // For partition and consumer group tests, create parent stream and topic 
first
+    if resource_type == ResourceType::Partition || resource_type == 
ResourceType::ConsumerGroup {
         root_client.create_stream(TEST_STREAM_NAME).await.unwrap();
         let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
         root_client
@@ -91,29 +88,35 @@ pub async fn run(
     }
 
     let results = match (resource_type, scenario_type) {
-        (ResourceType::User, ScenarioType::Cold) => {
-            execute_multiple_clients_users_cold(client_factory, 
use_barrier).await
-        }
         (ResourceType::User, ScenarioType::Hot) => {
-            execute_multiple_clients_users_hot(client_factory, 
use_barrier).await
+            execute_users_hot(client_factory, use_barrier).await
+        }
+        (ResourceType::User, ScenarioType::Cold) => {
+            execute_users_cold(client_factory, use_barrier).await
         }
         (ResourceType::Stream, ScenarioType::Hot) => {
-            execute_multiple_clients_streams_hot(client_factory, 
use_barrier).await
+            execute_streams_hot(client_factory, use_barrier).await
         }
         (ResourceType::Stream, ScenarioType::Cold) => {
-            execute_multiple_clients_streams_cold(client_factory, 
use_barrier).await
+            execute_streams_cold(client_factory, use_barrier).await
         }
         (ResourceType::Topic, ScenarioType::Hot) => {
-            execute_multiple_clients_topics_hot(client_factory, 
use_barrier).await
+            execute_topics_hot(client_factory, use_barrier).await
         }
         (ResourceType::Topic, ScenarioType::Cold) => {
-            execute_multiple_clients_topics_cold(client_factory, 
use_barrier).await
+            execute_topics_cold(client_factory, use_barrier).await
         }
         (ResourceType::Partition, ScenarioType::Hot) => {
-            execute_multiple_clients_partitions_hot(client_factory, 
use_barrier).await
+            execute_partitions_hot(client_factory, use_barrier).await
         }
         // Partitions don't have names, so Cold scenario doesn't apply
         (ResourceType::Partition, ScenarioType::Cold) => vec![],
+        (ResourceType::ConsumerGroup, ScenarioType::Hot) => {
+            execute_consumer_groups_hot(client_factory, use_barrier).await
+        }
+        (ResourceType::ConsumerGroup, ScenarioType::Cold) => {
+            execute_consumer_groups_cold(client_factory, use_barrier).await
+        }
     };
 
     if !results.is_empty() {
@@ -123,320 +126,299 @@ pub async fn run(
     }
 }
 
-async fn execute_multiple_clients_users_hot(
+async fn execute_users_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for client_id in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
-            for i in 0..OPERATIONS_PER_CLIENT {
-                let username = format!("race-user-{}-{}", client_id, i);
-                let result = client
-                    .create_user(&username, USER_PASSWORD, UserStatus::Active, 
None)
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            let username = format!("race-user-{}", client_id);
+            client
+                .create_user(&username, USER_PASSWORD, UserStatus::Active, 
None)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_users_cold(
+async fn execute_users_cold(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
     const DUPLICATE_USER: &str = "race-user-duplicate";
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = client
-                    .create_user(DUPLICATE_USER, USER_PASSWORD, 
UserStatus::Active, None)
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            client
+                .create_user(DUPLICATE_USER, USER_PASSWORD, 
UserStatus::Active, None)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_streams_hot(
+async fn execute_streams_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for client_id in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
-            for i in 0..OPERATIONS_PER_CLIENT {
-                let stream_name = format!("race-stream-{}-{}", client_id, i);
-                let result = client.create_stream(&stream_name).await.map(|_| 
());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            let stream_name = format!("race-stream-{}", client_id);
+            client.create_stream(&stream_name).await.map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_streams_cold(
+async fn execute_streams_cold(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
     const DUPLICATE_STREAM: &str = "race-stream-duplicate";
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = 
client.create_stream(DUPLICATE_STREAM).await.map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            client.create_stream(DUPLICATE_STREAM).await.map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_topics_hot(
+async fn execute_topics_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for client_id in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
             let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
-
-            for i in 0..OPERATIONS_PER_CLIENT {
-                let topic_name = format!("race-topic-{}-{}", client_id, i);
-                let result = client
-                    .create_topic(
-                        &stream_id,
-                        &topic_name,
-                        PARTITIONS_COUNT,
-                        CompressionAlgorithm::default(),
-                        None,
-                        IggyExpiry::NeverExpire,
-                        MaxTopicSize::ServerDefault,
-                    )
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
-
-        handles.push(handle);
+            let topic_name = format!("race-topic-{}", client_id);
+            client
+                .create_topic(
+                    &stream_id,
+                    &topic_name,
+                    PARTITIONS_COUNT,
+                    CompressionAlgorithm::default(),
+                    None,
+                    IggyExpiry::NeverExpire,
+                    MaxTopicSize::ServerDefault,
+                )
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_topics_cold(
+async fn execute_topics_cold(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
     const DUPLICATE_TOPIC: &str = "race-topic-duplicate";
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
             let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+            client
+                .create_topic(
+                    &stream_id,
+                    DUPLICATE_TOPIC,
+                    PARTITIONS_COUNT,
+                    CompressionAlgorithm::default(),
+                    None,
+                    IggyExpiry::NeverExpire,
+                    MaxTopicSize::ServerDefault,
+                )
+                .await
+                .map(|_| ())
+        }));
+    }
 
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = client
-                    .create_topic(
-                        &stream_id,
-                        DUPLICATE_TOPIC,
-                        PARTITIONS_COUNT,
-                        CompressionAlgorithm::default(),
-                        None,
-                        IggyExpiry::NeverExpire,
-                        MaxTopicSize::ServerDefault,
-                    )
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
+    join_all(handles)
+        .await
+        .into_iter()
+        .map(|r| r.expect("task panicked"))
+        .collect()
+}
 
-        handles.push(handle);
+async fn execute_partitions_hot(
+    client_factory: &dyn ClientFactory,
+    use_barrier: bool,
+) -> Vec<OperationResult> {
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
+
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
+        let client = create_client(client_factory).await;
+        login_root(&client).await;
+        let barrier = barrier.clone();
+
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
+                b.wait().await;
+            }
+            let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+            let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+            client
+                .create_partitions(&stream_id, &topic_id, 1)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
-async fn execute_multiple_clients_partitions_hot(
+async fn execute_consumer_groups_hot(
     client_factory: &dyn ClientFactory,
     use_barrier: bool,
 ) -> Vec<OperationResult> {
-    let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
-    let barrier = if use_barrier {
-        Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
-    } else {
-        None
-    };
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-    for _ in 0..MULTIPLE_CLIENT_COUNT {
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for client_id in 0..CONCURRENT_CLIENTS {
         let client = create_client(client_factory).await;
         login_root(&client).await;
+        let barrier = barrier.clone();
 
-        let barrier_clone = barrier.clone();
-        let handle = tokio::spawn(async move {
-            if let Some(b) = barrier_clone {
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
                 b.wait().await;
             }
-
-            let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
             let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
             let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+            let group_name = format!("race-consumer-group-{}", client_id);
+            client
+                .create_consumer_group(&stream_id, &topic_id, &group_name)
+                .await
+                .map(|_| ())
+        }));
+    }
 
-            for _ in 0..OPERATIONS_PER_CLIENT {
-                let result = client
-                    .create_partitions(&stream_id, &topic_id, 
PARTITIONS_TO_ADD)
-                    .await
-                    .map(|_| ());
-                results.push(result);
-            }
-            results
-        });
+    join_all(handles)
+        .await
+        .into_iter()
+        .map(|r| r.expect("task panicked"))
+        .collect()
+}
+
+async fn execute_consumer_groups_cold(
+    client_factory: &dyn ClientFactory,
+    use_barrier: bool,
+) -> Vec<OperationResult> {
+    const DUPLICATE_CONSUMER_GROUP: &str = "race-consumer-group-duplicate";
+    let barrier = use_barrier.then(|| 
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
 
-        handles.push(handle);
+    let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+    for _ in 0..CONCURRENT_CLIENTS {
+        let client = create_client(client_factory).await;
+        login_root(&client).await;
+        let barrier = barrier.clone();
+
+        handles.push(tokio::spawn(async move {
+            if let Some(b) = barrier {
+                b.wait().await;
+            }
+            let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+            let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+            client
+                .create_consumer_group(&stream_id, &topic_id, 
DUPLICATE_CONSUMER_GROUP)
+                .await
+                .map(|_| ())
+        }));
     }
 
-    let all_results = join_all(handles).await;
-    all_results
+    join_all(handles)
+        .await
         .into_iter()
-        .flat_map(|r| r.expect("Tokio task panicked"))
+        .map(|r| r.expect("task panicked"))
         .collect()
 }
 
@@ -449,9 +431,9 @@ fn validate_results(results: &[OperationResult], 
scenario_type: ScenarioType) {
             // Hot path: all operations should succeed
             assert_eq!(
                 success_count,
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected all {} operations to succeed, but only {} 
succeeded. Errors: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 success_count,
                 results.iter().filter(|r| r.is_err()).collect::<Vec<_>>()
             );
@@ -470,9 +452,9 @@ fn validate_results(results: &[OperationResult], 
scenario_type: ScenarioType) {
             );
             assert_eq!(
                 error_count,
-                OPERATIONS_COUNT - 1,
+                CONCURRENT_CLIENTS - 1,
                 "Cold path: Expected {} errors, but got {}",
-                OPERATIONS_COUNT - 1,
+                CONCURRENT_CLIENTS - 1,
                 error_count
             );
 
@@ -485,6 +467,7 @@ fn validate_results(results: &[OperationResult], 
scenario_type: ScenarioType) {
                         IggyError::UserAlreadyExists
                             | IggyError::StreamNameAlreadyExists(_)
                             | IggyError::TopicNameAlreadyExists(_, _)
+                            | IggyError::ConsumerGroupNameAlreadyExists(_, _)
                             | IggyError::HttpResponseError(400, _)
                     ),
                     "Expected 'already exists' error, got: {:?}",
@@ -624,6 +607,36 @@ async fn validate_server_state(
                 );
             }
         }
+        ResourceType::ConsumerGroup => {
+            let mut group_states = Vec::new();
+            for client in &clients {
+                let state = validate_consumer_groups_state(client, 
scenario_type).await;
+                group_states.push(state);
+            }
+
+            let first_state = &group_states[0];
+            for (i, state) in group_states.iter().enumerate().skip(1) {
+                assert_eq!(
+                    state.len(),
+                    first_state.len(),
+                    "Client {} sees different number of consumer groups ({}) 
than client 0 ({})",
+                    i,
+                    state.len(),
+                    first_state.len()
+                );
+
+                let mut first_names: Vec<_> = first_state.iter().map(|g| 
&g.name).collect();
+                let mut current_names: Vec<_> = state.iter().map(|g| 
&g.name).collect();
+                first_names.sort();
+                current_names.sort();
+
+                assert_eq!(
+                    current_names, first_names,
+                    "Client {} sees different consumer groups than client 0",
+                    i
+                );
+            }
+        }
     }
 }
 
@@ -636,12 +649,12 @@ async fn validate_users_state(client: &IggyClient, 
scenario_type: ScenarioType)
 
     match scenario_type {
         ScenarioType::Hot => {
-            // Hot path: should have OPERATIONS_COUNT unique users
+            // Hot path: should have CONCURRENT_CLIENTS unique users
             assert_eq!(
                 test_users.len(),
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected {} users, but found {}. Users: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 test_users.len(),
                 test_users.iter().map(|u| &u.username).collect::<Vec<_>>()
             );
@@ -697,12 +710,12 @@ async fn validate_streams_state(client: &IggyClient, 
scenario_type: ScenarioType
 
     match scenario_type {
         ScenarioType::Hot => {
-            // Hot path: should have OPERATIONS_COUNT unique streams
+            // Hot path: should have CONCURRENT_CLIENTS unique streams
             assert_eq!(
                 test_streams.len(),
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected {} streams, but found {}. Streams: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 test_streams.len(),
                 test_streams.iter().map(|s| &s.name).collect::<Vec<_>>()
             );
@@ -765,12 +778,12 @@ async fn validate_topics_state(client: &IggyClient, 
scenario_type: ScenarioType)
 
     match scenario_type {
         ScenarioType::Hot => {
-            // Hot path: should have OPERATIONS_COUNT unique topics
+            // Hot path: should have CONCURRENT_CLIENTS unique topics
             assert_eq!(
                 test_topics.len(),
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 "Hot path: Expected {} topics, but found {}. Topics: {:?}",
-                OPERATIONS_COUNT,
+                CONCURRENT_CLIENTS,
                 test_topics.len(),
                 test_topics.iter().map(|t| &t.name).collect::<Vec<_>>()
             );
@@ -826,17 +839,82 @@ async fn validate_partitions_state(client: &IggyClient) 
-> u32 {
         .expect("Failed to get test topic")
         .expect("Test topic not found");
 
-    // Expected: initial partition + OPERATIONS_COUNT added partitions
-    let expected_partitions = PARTITIONS_COUNT + (OPERATIONS_COUNT as u32 * 
PARTITIONS_TO_ADD);
+    // Expected: initial partition + CONCURRENT_CLIENTS added partitions (1 
each)
+    let expected_partitions = PARTITIONS_COUNT + CONCURRENT_CLIENTS as u32;
     assert_eq!(
         topic.partitions_count, expected_partitions,
-        "Hot path: Expected {} partitions (1 initial + {} added), but found 
{}",
-        expected_partitions, OPERATIONS_COUNT, topic.partitions_count
+        "Expected {} partitions (1 initial + {} added), but found {}",
+        expected_partitions, CONCURRENT_CLIENTS, topic.partitions_count
     );
 
     topic.partitions_count
 }
 
+async fn validate_consumer_groups_state(
+    client: &IggyClient,
+    scenario_type: ScenarioType,
+) -> Vec<ConsumerGroup> {
+    let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+    let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+    let groups = client
+        .get_consumer_groups(&stream_id, &topic_id)
+        .await
+        .expect("Failed to get consumer groups");
+    let test_groups: Vec<_> = groups
+        .into_iter()
+        .filter(|g| g.name.starts_with("race-consumer-group-"))
+        .collect();
+
+    match scenario_type {
+        ScenarioType::Hot => {
+            assert_eq!(
+                test_groups.len(),
+                CONCURRENT_CLIENTS,
+                "Hot path: Expected {} consumer groups, but found {}. Groups: 
{:?}",
+                CONCURRENT_CLIENTS,
+                test_groups.len(),
+                test_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
+            );
+
+            let mut ids: Vec<u32> = test_groups.iter().map(|g| g.id).collect();
+            ids.sort_unstable();
+            let unique_ids: std::collections::HashSet<u32> = 
ids.iter().copied().collect();
+            assert_eq!(
+                unique_ids.len(),
+                test_groups.len(),
+                "Hot path: Found duplicate consumer group IDs: {:?}",
+                ids
+            );
+
+            let names: std::collections::HashSet<&str> =
+                test_groups.iter().map(|g| g.name.as_str()).collect();
+            assert_eq!(
+                names.len(),
+                test_groups.len(),
+                "Hot path: Found duplicate consumer group names"
+            );
+        }
+        ScenarioType::Cold => {
+            assert_eq!(
+                test_groups.len(),
+                1,
+                "Cold path: Expected exactly 1 consumer group, but found {}. 
Groups: {:?}",
+                test_groups.len(),
+                test_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
+            );
+
+            let group = &test_groups[0];
+            assert_eq!(
+                group.name, "race-consumer-group-duplicate",
+                "Cold path: Expected consumer group named 
'race-consumer-group-duplicate', found '{}'",
+                group.name
+            );
+        }
+    }
+
+    test_groups
+}
+
 async fn cleanup_resources(client: &IggyClient, resource_type: ResourceType) {
     match resource_type {
         ResourceType::User => {
@@ -852,13 +930,15 @@ async fn cleanup_resources(client: &IggyClient, 
resource_type: ResourceType) {
         ResourceType::Stream => {
             let streams = client.get_streams().await.unwrap();
             for stream in streams {
-                let _ = client
-                    .delete_stream(&Identifier::numeric(stream.id).unwrap())
-                    .await;
+                if stream.name.starts_with("race-stream-") {
+                    let _ = client
+                        
.delete_stream(&Identifier::numeric(stream.id).unwrap())
+                        .await;
+                }
             }
         }
-        // Just delete test stream (also cleans up topics and partitions)
-        ResourceType::Topic | ResourceType::Partition => {
+        // Just delete test stream (also cleans up topics, partitions, and 
consumer groups)
+        ResourceType::Topic | ResourceType::Partition | 
ResourceType::ConsumerGroup => {
             let _ = client
                 .delete_stream(&Identifier::named(TEST_STREAM_NAME).unwrap())
                 .await;

Reply via email to