This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch global-metadata-leftright in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0e81d8fb2a713995affe71ed5396c389701146e3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 23 10:58:13 2026 +0100 refactor concurrent scenario, check messages order --- .../tests/server/concurrent_addition.rs | 15 +- .../tests/server/scenarios/concurrent_scenario.rs | 560 ++++++++++++--------- ...h_multiple_clients_polling_messages_scenario.rs | 60 ++- ...with_single_client_polling_messages_scenario.rs | 39 +- 4 files changed, 426 insertions(+), 248 deletions(-) diff --git a/core/integration/tests/server/concurrent_addition.rs b/core/integration/tests/server/concurrent_addition.rs index c63681340..a1618f6b8 100644 --- a/core/integration/tests/server/concurrent_addition.rs +++ b/core/integration/tests/server/concurrent_addition.rs @@ -30,16 +30,17 @@ use test_case::test_matrix; // Test matrix for race condition scenarios // Tests all combinations of: // - Transport: TCP, HTTP, QUIC, WebSocket (4) -// - Resource: User, Stream, Topic (3) +// - Resource: User, Stream, Topic, Partition, ConsumerGroup (5) // - Path: Hot (unique names), Cold (duplicate names) (2) // - Barrier: On (synchronized), Off (unsynchronized) (2) -// Total: 4 × 3 × 2 × 2 = 48 test cases +// Total: 4 × 5 × 2 × 2 = 80 test cases +// Note: Partition + Cold is skipped (partitions don't have names) // TODO: Websocket fails for the `cold` type, cold means that we are creating resources with the same name. // It fails with the error assertion, instead of `AlreadyExist`, we get generic `Error`. #[test_matrix( [tcp(), http(), quic(), websocket()], - [user(), stream(), topic()], + [user(), stream(), topic(), partition(), consumer_group()], [hot(), cold()], [barrier_on(), barrier_off()] )] @@ -119,6 +120,14 @@ fn topic() -> ResourceType { ResourceType::Topic } +fn partition() -> ResourceType { + ResourceType::Partition +} + +fn consumer_group() -> ResourceType { + ResourceType::ConsumerGroup +} + fn hot() -> ScenarioType { ScenarioType::Hot } diff --git a/core/integration/tests/server/scenarios/concurrent_scenario.rs b/core/integration/tests/server/scenarios/concurrent_scenario.rs index 5140b42b6..d42296ba2 100644 --- a/core/integration/tests/server/scenarios/concurrent_scenario.rs +++ b/core/integration/tests/server/scenarios/concurrent_scenario.rs @@ -19,19 +19,16 @@ use crate::server::scenarios::create_client; use futures::future::join_all; use iggy::prelude::*; -use iggy_common::UserInfo; +use iggy_common::{ConsumerGroup, UserInfo}; use integration::test_server::{ClientFactory, login_root}; use std::sync::Arc; use tokio::sync::Barrier; -const OPERATIONS_COUNT: usize = 40; -const MULTIPLE_CLIENT_COUNT: usize = 10; -const OPERATIONS_PER_CLIENT: usize = OPERATIONS_COUNT / MULTIPLE_CLIENT_COUNT; +const CONCURRENT_CLIENTS: usize = 20; const USER_PASSWORD: &str = "secret"; const TEST_STREAM_NAME: &str = "race-test-stream"; const TEST_TOPIC_NAME: &str = "race-test-topic"; const PARTITIONS_COUNT: u32 = 1; -const PARTITIONS_TO_ADD: u32 = 1; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ResourceType { @@ -39,7 +36,7 @@ pub enum ResourceType { Stream, Topic, Partition, - // TODO(hubcio): add ConsumerGroup + ConsumerGroup, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -72,8 +69,8 @@ pub async fn run( root_client.create_stream(TEST_STREAM_NAME).await.unwrap(); } - // For partition tests, create parent stream and topic first - if resource_type == ResourceType::Partition { + // For partition and consumer group tests, create parent stream and topic first + if resource_type == ResourceType::Partition || resource_type == ResourceType::ConsumerGroup { root_client.create_stream(TEST_STREAM_NAME).await.unwrap(); let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap(); root_client @@ -91,29 +88,35 @@ pub async fn run( } let results = match (resource_type, scenario_type) { - (ResourceType::User, ScenarioType::Cold) => { - execute_multiple_clients_users_cold(client_factory, use_barrier).await - } (ResourceType::User, ScenarioType::Hot) => { - execute_multiple_clients_users_hot(client_factory, use_barrier).await + execute_users_hot(client_factory, use_barrier).await + } + (ResourceType::User, ScenarioType::Cold) => { + execute_users_cold(client_factory, use_barrier).await } (ResourceType::Stream, ScenarioType::Hot) => { - execute_multiple_clients_streams_hot(client_factory, use_barrier).await + execute_streams_hot(client_factory, use_barrier).await } (ResourceType::Stream, ScenarioType::Cold) => { - execute_multiple_clients_streams_cold(client_factory, use_barrier).await + execute_streams_cold(client_factory, use_barrier).await } (ResourceType::Topic, ScenarioType::Hot) => { - execute_multiple_clients_topics_hot(client_factory, use_barrier).await + execute_topics_hot(client_factory, use_barrier).await } (ResourceType::Topic, ScenarioType::Cold) => { - execute_multiple_clients_topics_cold(client_factory, use_barrier).await + execute_topics_cold(client_factory, use_barrier).await } (ResourceType::Partition, ScenarioType::Hot) => { - execute_multiple_clients_partitions_hot(client_factory, use_barrier).await + execute_partitions_hot(client_factory, use_barrier).await } // Partitions don't have names, so Cold scenario doesn't apply (ResourceType::Partition, ScenarioType::Cold) => vec![], + (ResourceType::ConsumerGroup, ScenarioType::Hot) => { + execute_consumer_groups_hot(client_factory, use_barrier).await + } + (ResourceType::ConsumerGroup, ScenarioType::Cold) => { + execute_consumer_groups_cold(client_factory, use_barrier).await + } }; if !results.is_empty() { @@ -123,320 +126,299 @@ pub async fn run( } } -async fn execute_multiple_clients_users_hot( +async fn execute_users_hot( client_factory: &dyn ClientFactory, use_barrier: bool, ) -> Vec<OperationResult> { - let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); - let barrier = if use_barrier { - Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT))) - } else { - None - }; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - for client_id in 0..MULTIPLE_CLIENT_COUNT { + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for client_id in 0..CONCURRENT_CLIENTS { let client = create_client(client_factory).await; login_root(&client).await; + let barrier = barrier.clone(); - let barrier_clone = barrier.clone(); - let handle = tokio::spawn(async move { - if let Some(b) = barrier_clone { + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { b.wait().await; } - - let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT); - for i in 0..OPERATIONS_PER_CLIENT { - let username = format!("race-user-{}-{}", client_id, i); - let result = client - .create_user(&username, USER_PASSWORD, UserStatus::Active, None) - .await - .map(|_| ()); - results.push(result); - } - results - }); - - handles.push(handle); + let username = format!("race-user-{}", client_id); + client + .create_user(&username, USER_PASSWORD, UserStatus::Active, None) + .await + .map(|_| ()) + })); } - let all_results = join_all(handles).await; - all_results + join_all(handles) + .await .into_iter() - .flat_map(|r| r.expect("Tokio task panicked")) + .map(|r| r.expect("task panicked")) .collect() } -async fn execute_multiple_clients_users_cold( +async fn execute_users_cold( client_factory: &dyn ClientFactory, use_barrier: bool, ) -> Vec<OperationResult> { - let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); const DUPLICATE_USER: &str = "race-user-duplicate"; - let barrier = if use_barrier { - Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT))) - } else { - None - }; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - for _ in 0..MULTIPLE_CLIENT_COUNT { + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for _ in 0..CONCURRENT_CLIENTS { let client = create_client(client_factory).await; login_root(&client).await; + let barrier = barrier.clone(); - let barrier_clone = barrier.clone(); - let handle = tokio::spawn(async move { - if let Some(b) = barrier_clone { + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { b.wait().await; } - - let mut results = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); - for _ in 0..OPERATIONS_PER_CLIENT { - let result = client - .create_user(DUPLICATE_USER, USER_PASSWORD, UserStatus::Active, None) - .await - .map(|_| ()); - results.push(result); - } - results - }); - - handles.push(handle); + client + .create_user(DUPLICATE_USER, USER_PASSWORD, UserStatus::Active, None) + .await + .map(|_| ()) + })); } - let all_results = join_all(handles).await; - all_results + join_all(handles) + .await .into_iter() - .flat_map(|r| r.expect("Tokio task panicked")) + .map(|r| r.expect("task panicked")) .collect() } -async fn execute_multiple_clients_streams_hot( +async fn execute_streams_hot( client_factory: &dyn ClientFactory, use_barrier: bool, ) -> Vec<OperationResult> { - let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); - let barrier = if use_barrier { - Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT))) - } else { - None - }; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - for client_id in 0..MULTIPLE_CLIENT_COUNT { + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for client_id in 0..CONCURRENT_CLIENTS { let client = create_client(client_factory).await; login_root(&client).await; + let barrier = barrier.clone(); - let barrier_clone = barrier.clone(); - let handle = tokio::spawn(async move { - if let Some(b) = barrier_clone { + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { b.wait().await; } - - let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT); - for i in 0..OPERATIONS_PER_CLIENT { - let stream_name = format!("race-stream-{}-{}", client_id, i); - let result = client.create_stream(&stream_name).await.map(|_| ()); - results.push(result); - } - results - }); - - handles.push(handle); + let stream_name = format!("race-stream-{}", client_id); + client.create_stream(&stream_name).await.map(|_| ()) + })); } - let all_results = join_all(handles).await; - all_results + join_all(handles) + .await .into_iter() - .flat_map(|r| r.expect("Tokio task panicked")) + .map(|r| r.expect("task panicked")) .collect() } -async fn execute_multiple_clients_streams_cold( +async fn execute_streams_cold( client_factory: &dyn ClientFactory, use_barrier: bool, ) -> Vec<OperationResult> { - let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); const DUPLICATE_STREAM: &str = "race-stream-duplicate"; - let barrier = if use_barrier { - Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT))) - } else { - None - }; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - for _ in 0..MULTIPLE_CLIENT_COUNT { + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for _ in 0..CONCURRENT_CLIENTS { let client = create_client(client_factory).await; login_root(&client).await; + let barrier = barrier.clone(); - let barrier_clone = barrier.clone(); - let handle = tokio::spawn(async move { - if let Some(b) = barrier_clone { + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { b.wait().await; } - - let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT); - for _ in 0..OPERATIONS_PER_CLIENT { - let result = client.create_stream(DUPLICATE_STREAM).await.map(|_| ()); - results.push(result); - } - results - }); - - handles.push(handle); + client.create_stream(DUPLICATE_STREAM).await.map(|_| ()) + })); } - let all_results = join_all(handles).await; - all_results + join_all(handles) + .await .into_iter() - .flat_map(|r| r.expect("Tokio task panicked")) + .map(|r| r.expect("task panicked")) .collect() } -async fn execute_multiple_clients_topics_hot( +async fn execute_topics_hot( client_factory: &dyn ClientFactory, use_barrier: bool, ) -> Vec<OperationResult> { - let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); - let barrier = if use_barrier { - Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT))) - } else { - None - }; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - for client_id in 0..MULTIPLE_CLIENT_COUNT { + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for client_id in 0..CONCURRENT_CLIENTS { let client = create_client(client_factory).await; login_root(&client).await; + let barrier = barrier.clone(); - let barrier_clone = barrier.clone(); - let handle = tokio::spawn(async move { - if let Some(b) = barrier_clone { + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { b.wait().await; } - - let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT); let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap(); - - for i in 0..OPERATIONS_PER_CLIENT { - let topic_name = format!("race-topic-{}-{}", client_id, i); - let result = client - .create_topic( - &stream_id, - &topic_name, - PARTITIONS_COUNT, - CompressionAlgorithm::default(), - None, - IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, - ) - .await - .map(|_| ()); - results.push(result); - } - results - }); - - handles.push(handle); + let topic_name = format!("race-topic-{}", client_id); + client + .create_topic( + &stream_id, + &topic_name, + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .map(|_| ()) + })); } - let all_results = join_all(handles).await; - all_results + join_all(handles) + .await .into_iter() - .flat_map(|r| r.expect("Tokio task panicked")) + .map(|r| r.expect("task panicked")) .collect() } -async fn execute_multiple_clients_topics_cold( +async fn execute_topics_cold( client_factory: &dyn ClientFactory, use_barrier: bool, ) -> Vec<OperationResult> { - let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); const DUPLICATE_TOPIC: &str = "race-topic-duplicate"; - let barrier = if use_barrier { - Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT))) - } else { - None - }; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - for _ in 0..MULTIPLE_CLIENT_COUNT { + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for _ in 0..CONCURRENT_CLIENTS { let client = create_client(client_factory).await; login_root(&client).await; + let barrier = barrier.clone(); - let barrier_clone = barrier.clone(); - let handle = tokio::spawn(async move { - if let Some(b) = barrier_clone { + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { b.wait().await; } - - let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT); let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap(); + client + .create_topic( + &stream_id, + DUPLICATE_TOPIC, + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .map(|_| ()) + })); + } - for _ in 0..OPERATIONS_PER_CLIENT { - let result = client - .create_topic( - &stream_id, - DUPLICATE_TOPIC, - PARTITIONS_COUNT, - CompressionAlgorithm::default(), - None, - IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, - ) - .await - .map(|_| ()); - results.push(result); - } - results - }); + join_all(handles) + .await + .into_iter() + .map(|r| r.expect("task panicked")) + .collect() +} - handles.push(handle); +async fn execute_partitions_hot( + client_factory: &dyn ClientFactory, + use_barrier: bool, +) -> Vec<OperationResult> { + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); + + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for _ in 0..CONCURRENT_CLIENTS { + let client = create_client(client_factory).await; + login_root(&client).await; + let barrier = barrier.clone(); + + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { + b.wait().await; + } + let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap(); + let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap(); + client + .create_partitions(&stream_id, &topic_id, 1) + .await + .map(|_| ()) + })); } - let all_results = join_all(handles).await; - all_results + join_all(handles) + .await .into_iter() - .flat_map(|r| r.expect("Tokio task panicked")) + .map(|r| r.expect("task panicked")) .collect() } -async fn execute_multiple_clients_partitions_hot( +async fn execute_consumer_groups_hot( client_factory: &dyn ClientFactory, use_barrier: bool, ) -> Vec<OperationResult> { - let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT); - let barrier = if use_barrier { - Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT))) - } else { - None - }; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - for _ in 0..MULTIPLE_CLIENT_COUNT { + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for client_id in 0..CONCURRENT_CLIENTS { let client = create_client(client_factory).await; login_root(&client).await; + let barrier = barrier.clone(); - let barrier_clone = barrier.clone(); - let handle = tokio::spawn(async move { - if let Some(b) = barrier_clone { + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { b.wait().await; } - - let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT); let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap(); let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap(); + let group_name = format!("race-consumer-group-{}", client_id); + client + .create_consumer_group(&stream_id, &topic_id, &group_name) + .await + .map(|_| ()) + })); + } - for _ in 0..OPERATIONS_PER_CLIENT { - let result = client - .create_partitions(&stream_id, &topic_id, PARTITIONS_TO_ADD) - .await - .map(|_| ()); - results.push(result); - } - results - }); + join_all(handles) + .await + .into_iter() + .map(|r| r.expect("task panicked")) + .collect() +} + +async fn execute_consumer_groups_cold( + client_factory: &dyn ClientFactory, + use_barrier: bool, +) -> Vec<OperationResult> { + const DUPLICATE_CONSUMER_GROUP: &str = "race-consumer-group-duplicate"; + let barrier = use_barrier.then(|| Arc::new(Barrier::new(CONCURRENT_CLIENTS))); - handles.push(handle); + let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS); + for _ in 0..CONCURRENT_CLIENTS { + let client = create_client(client_factory).await; + login_root(&client).await; + let barrier = barrier.clone(); + + handles.push(tokio::spawn(async move { + if let Some(b) = barrier { + b.wait().await; + } + let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap(); + let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap(); + client + .create_consumer_group(&stream_id, &topic_id, DUPLICATE_CONSUMER_GROUP) + .await + .map(|_| ()) + })); } - let all_results = join_all(handles).await; - all_results + join_all(handles) + .await .into_iter() - .flat_map(|r| r.expect("Tokio task panicked")) + .map(|r| r.expect("task panicked")) .collect() } @@ -449,9 +431,9 @@ fn validate_results(results: &[OperationResult], scenario_type: ScenarioType) { // Hot path: all operations should succeed assert_eq!( success_count, - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, "Hot path: Expected all {} operations to succeed, but only {} succeeded. Errors: {:?}", - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, success_count, results.iter().filter(|r| r.is_err()).collect::<Vec<_>>() ); @@ -470,9 +452,9 @@ fn validate_results(results: &[OperationResult], scenario_type: ScenarioType) { ); assert_eq!( error_count, - OPERATIONS_COUNT - 1, + CONCURRENT_CLIENTS - 1, "Cold path: Expected {} errors, but got {}", - OPERATIONS_COUNT - 1, + CONCURRENT_CLIENTS - 1, error_count ); @@ -485,6 +467,7 @@ fn validate_results(results: &[OperationResult], scenario_type: ScenarioType) { IggyError::UserAlreadyExists | IggyError::StreamNameAlreadyExists(_) | IggyError::TopicNameAlreadyExists(_, _) + | IggyError::ConsumerGroupNameAlreadyExists(_, _) | IggyError::HttpResponseError(400, _) ), "Expected 'already exists' error, got: {:?}", @@ -624,6 +607,36 @@ async fn validate_server_state( ); } } + ResourceType::ConsumerGroup => { + let mut group_states = Vec::new(); + for client in &clients { + let state = validate_consumer_groups_state(client, scenario_type).await; + group_states.push(state); + } + + let first_state = &group_states[0]; + for (i, state) in group_states.iter().enumerate().skip(1) { + assert_eq!( + state.len(), + first_state.len(), + "Client {} sees different number of consumer groups ({}) than client 0 ({})", + i, + state.len(), + first_state.len() + ); + + let mut first_names: Vec<_> = first_state.iter().map(|g| &g.name).collect(); + let mut current_names: Vec<_> = state.iter().map(|g| &g.name).collect(); + first_names.sort(); + current_names.sort(); + + assert_eq!( + current_names, first_names, + "Client {} sees different consumer groups than client 0", + i + ); + } + } } } @@ -636,12 +649,12 @@ async fn validate_users_state(client: &IggyClient, scenario_type: ScenarioType) match scenario_type { ScenarioType::Hot => { - // Hot path: should have OPERATIONS_COUNT unique users + // Hot path: should have CONCURRENT_CLIENTS unique users assert_eq!( test_users.len(), - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, "Hot path: Expected {} users, but found {}. Users: {:?}", - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, test_users.len(), test_users.iter().map(|u| &u.username).collect::<Vec<_>>() ); @@ -697,12 +710,12 @@ async fn validate_streams_state(client: &IggyClient, scenario_type: ScenarioType match scenario_type { ScenarioType::Hot => { - // Hot path: should have OPERATIONS_COUNT unique streams + // Hot path: should have CONCURRENT_CLIENTS unique streams assert_eq!( test_streams.len(), - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, "Hot path: Expected {} streams, but found {}. Streams: {:?}", - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, test_streams.len(), test_streams.iter().map(|s| &s.name).collect::<Vec<_>>() ); @@ -765,12 +778,12 @@ async fn validate_topics_state(client: &IggyClient, scenario_type: ScenarioType) match scenario_type { ScenarioType::Hot => { - // Hot path: should have OPERATIONS_COUNT unique topics + // Hot path: should have CONCURRENT_CLIENTS unique topics assert_eq!( test_topics.len(), - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, "Hot path: Expected {} topics, but found {}. Topics: {:?}", - OPERATIONS_COUNT, + CONCURRENT_CLIENTS, test_topics.len(), test_topics.iter().map(|t| &t.name).collect::<Vec<_>>() ); @@ -826,17 +839,82 @@ async fn validate_partitions_state(client: &IggyClient) -> u32 { .expect("Failed to get test topic") .expect("Test topic not found"); - // Expected: initial partition + OPERATIONS_COUNT added partitions - let expected_partitions = PARTITIONS_COUNT + (OPERATIONS_COUNT as u32 * PARTITIONS_TO_ADD); + // Expected: initial partition + CONCURRENT_CLIENTS added partitions (1 each) + let expected_partitions = PARTITIONS_COUNT + CONCURRENT_CLIENTS as u32; assert_eq!( topic.partitions_count, expected_partitions, - "Hot path: Expected {} partitions (1 initial + {} added), but found {}", - expected_partitions, OPERATIONS_COUNT, topic.partitions_count + "Expected {} partitions (1 initial + {} added), but found {}", + expected_partitions, CONCURRENT_CLIENTS, topic.partitions_count ); topic.partitions_count } +async fn validate_consumer_groups_state( + client: &IggyClient, + scenario_type: ScenarioType, +) -> Vec<ConsumerGroup> { + let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap(); + let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap(); + let groups = client + .get_consumer_groups(&stream_id, &topic_id) + .await + .expect("Failed to get consumer groups"); + let test_groups: Vec<_> = groups + .into_iter() + .filter(|g| g.name.starts_with("race-consumer-group-")) + .collect(); + + match scenario_type { + ScenarioType::Hot => { + assert_eq!( + test_groups.len(), + CONCURRENT_CLIENTS, + "Hot path: Expected {} consumer groups, but found {}. Groups: {:?}", + CONCURRENT_CLIENTS, + test_groups.len(), + test_groups.iter().map(|g| &g.name).collect::<Vec<_>>() + ); + + let mut ids: Vec<u32> = test_groups.iter().map(|g| g.id).collect(); + ids.sort_unstable(); + let unique_ids: std::collections::HashSet<u32> = ids.iter().copied().collect(); + assert_eq!( + unique_ids.len(), + test_groups.len(), + "Hot path: Found duplicate consumer group IDs: {:?}", + ids + ); + + let names: std::collections::HashSet<&str> = + test_groups.iter().map(|g| g.name.as_str()).collect(); + assert_eq!( + names.len(), + test_groups.len(), + "Hot path: Found duplicate consumer group names" + ); + } + ScenarioType::Cold => { + assert_eq!( + test_groups.len(), + 1, + "Cold path: Expected exactly 1 consumer group, but found {}. Groups: {:?}", + test_groups.len(), + test_groups.iter().map(|g| &g.name).collect::<Vec<_>>() + ); + + let group = &test_groups[0]; + assert_eq!( + group.name, "race-consumer-group-duplicate", + "Cold path: Expected consumer group named 'race-consumer-group-duplicate', found '{}'", + group.name + ); + } + } + + test_groups +} + async fn cleanup_resources(client: &IggyClient, resource_type: ResourceType) { match resource_type { ResourceType::User => { @@ -852,13 +930,15 @@ async fn cleanup_resources(client: &IggyClient, resource_type: ResourceType) { ResourceType::Stream => { let streams = client.get_streams().await.unwrap(); for stream in streams { - let _ = client - .delete_stream(&Identifier::numeric(stream.id).unwrap()) - .await; + if stream.name.starts_with("race-stream-") { + let _ = client + .delete_stream(&Identifier::numeric(stream.id).unwrap()) + .await; + } } } - // Just delete test stream (also cleans up topics and partitions) - ResourceType::Topic | ResourceType::Partition => { + // Just delete test stream (also cleans up topics, partitions, and consumer groups) + ResourceType::Topic | ResourceType::Partition | ResourceType::ConsumerGroup => { let _ = client .delete_stream(&Identifier::named(TEST_STREAM_NAME).unwrap()) .await; diff --git a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs index 6345061d2..076ecae25 100644 --- a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs +++ b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs @@ -24,7 +24,7 @@ use iggy::prelude::*; use integration::test_server::{ ClientFactory, assert_clean_system, create_user, login_root, login_user, }; -use std::str::FromStr; +use std::str::{FromStr, from_utf8}; pub async fn run(client_factory: &dyn ClientFactory) { let system_client = create_client(client_factory).await; @@ -132,6 +132,7 @@ async fn execute_using_messages_key_key( async fn poll_messages(client: &IggyClient) -> u32 { let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); let mut total_read_messages_count = 0; + let mut last_entity_id: Option<u32> = None; const MAX_RETRIES: u32 = 5; let mut total_retries = 0; @@ -163,12 +164,42 @@ async fn poll_messages(client: &IggyClient) -> u32 { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; }; - total_read_messages_count += polled_messages.messages.len() as u32; + if polled_messages.messages.is_empty() { + if total_retries >= MAX_RETRIES { + break; + } + continue; + } + + let message = &polled_messages.messages[0]; + let payload = from_utf8(&message.payload).unwrap(); + let entity_id = parse_entity_id_from_payload(payload); + + // Each client polls from a single partition; messages should arrive in send order + if let Some(last) = last_entity_id { + assert!( + entity_id > last, + "Messages out of order: {} should be > {}", + entity_id, + last + ); + } + last_entity_id = Some(entity_id); + + total_read_messages_count += 1; } total_read_messages_count } +fn parse_entity_id_from_payload(payload: &str) -> u32 { + payload + .strip_prefix("message-") + .expect("payload should start with 'message-'") + .parse() + .expect("entity_id should be a valid u32") +} + fn create_message_payload(entity_id: u32) -> String { format!("message-{entity_id}") } @@ -214,6 +245,7 @@ async fn execute_using_none_key( async fn validate_message_polling(client: &IggyClient) { let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut partition_id: Option<u32> = None; const MAX_RETRIES: u32 = 5; let mut total_retries = 0; @@ -256,6 +288,23 @@ async fn validate_message_polling(client: &IggyClient) { let message = &polled_messages.messages[0]; let offset = (i - 1) as u64; assert_eq!(message.header.offset, offset); + + let payload = from_utf8(&message.payload).unwrap(); + + if partition_id.is_none() { + partition_id = Some(parse_partition_id_from_extended_payload(payload)); + } + + // For balanced partitioning: entity_id = partition_id + offset * PARTITIONS_COUNT + let p = partition_id.unwrap(); + let expected_entity_id = p + (offset as u32) * PARTITIONS_COUNT; + let expected_payload = create_extended_message_payload(p, expected_entity_id); + + assert_eq!( + payload, expected_payload, + "Payload mismatch at offset {}: expected '{}', got '{}'", + offset, expected_payload, payload + ); } let polled_messages = client @@ -273,6 +322,13 @@ async fn validate_message_polling(client: &IggyClient) { assert!(polled_messages.messages.is_empty()) } +fn parse_partition_id_from_extended_payload(payload: &str) -> u32 { + let parts: Vec<&str> = payload.split('-').collect(); + parts[1] + .parse() + .expect("partition_id should be a valid u32") +} + fn create_extended_message_payload(partition_id: u32, entity_id: u32) -> String { format!("message-{partition_id}-{entity_id}") } diff --git a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs index 5eece2dcb..166267390 100644 --- a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs +++ b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs @@ -22,6 +22,7 @@ use crate::server::scenarios::{ }; use iggy::prelude::*; use integration::test_server::{ClientFactory, assert_clean_system, login_root}; +use std::collections::HashSet; use std::str::{FromStr, from_utf8}; pub async fn run(client_factory: &dyn ClientFactory) { @@ -99,8 +100,11 @@ async fn execute_using_messages_key_key(client: &IggyClient) { } // 2. Poll the messages for the single client which has assigned all partitions in the consumer group + // Hash-based partitioning distributes messages unpredictably across partitions, and polling + // round-robins across partitions, so we verify completeness rather than order. let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); - let mut total_read_messages_count = 0; + let mut received_entity_ids = HashSet::new(); + for _ in 1..=PARTITIONS_COUNT * MESSAGES_COUNT { let polled_messages = client .poll_messages( @@ -115,10 +119,39 @@ async fn execute_using_messages_key_key(client: &IggyClient) { .await .unwrap(); - total_read_messages_count += polled_messages.messages.len() as u32; + for message in &polled_messages.messages { + let payload = from_utf8(&message.payload).unwrap(); + let entity_id = parse_entity_id_from_payload(payload); + assert!( + (1..=MESSAGES_COUNT).contains(&entity_id), + "entity_id {} out of expected range 1..={}", + entity_id, + MESSAGES_COUNT + ); + received_entity_ids.insert(entity_id); + } } - assert_eq!(total_read_messages_count, MESSAGES_COUNT); + assert_eq!( + received_entity_ids.len() as u32, + MESSAGES_COUNT, + "Expected {} unique messages, got {}", + MESSAGES_COUNT, + received_entity_ids.len() + ); + let expected: HashSet<u32> = (1..=MESSAGES_COUNT).collect(); + assert_eq!( + received_entity_ids, expected, + "Missing or duplicate messages" + ); +} + +fn parse_entity_id_from_payload(payload: &str) -> u32 { + payload + .strip_prefix("message-") + .expect("payload should start with 'message-'") + .parse() + .expect("entity_id should be a valid u32") } fn create_message_payload(entity_id: u32) -> String {
