This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch global-metadata-leftright
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/global-metadata-leftright by
this push:
new 8a1a24ced refactor concurrent scenario
8a1a24ced is described below
commit 8a1a24ced84f97828281af9d46d60c594d5bc99f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 23 10:58:13 2026 +0100
refactor concurrent scenario
---
.../tests/server/concurrent_addition.rs | 15 +-
.../tests/server/scenarios/concurrent_scenario.rs | 560 ++++++++++++---------
2 files changed, 332 insertions(+), 243 deletions(-)
diff --git a/core/integration/tests/server/concurrent_addition.rs
b/core/integration/tests/server/concurrent_addition.rs
index c63681340..a1618f6b8 100644
--- a/core/integration/tests/server/concurrent_addition.rs
+++ b/core/integration/tests/server/concurrent_addition.rs
@@ -30,16 +30,17 @@ use test_case::test_matrix;
// Test matrix for race condition scenarios
// Tests all combinations of:
// - Transport: TCP, HTTP, QUIC, WebSocket (4)
-// - Resource: User, Stream, Topic (3)
+// - Resource: User, Stream, Topic, Partition, ConsumerGroup (5)
// - Path: Hot (unique names), Cold (duplicate names) (2)
// - Barrier: On (synchronized), Off (unsynchronized) (2)
-// Total: 4 × 3 × 2 × 2 = 48 test cases
+// Total: 4 × 5 × 2 × 2 = 80 test cases
+// Note: Partition + Cold is skipped (partitions don't have names)
// TODO: Websocket fails for the `cold` type, cold means that we are creating
resources with the same name.
// It fails with the error assertion, instead of `AlreadyExist`, we get
generic `Error`.
#[test_matrix(
[tcp(), http(), quic(), websocket()],
- [user(), stream(), topic()],
+ [user(), stream(), topic(), partition(), consumer_group()],
[hot(), cold()],
[barrier_on(), barrier_off()]
)]
@@ -119,6 +120,14 @@ fn topic() -> ResourceType {
ResourceType::Topic
}
+fn partition() -> ResourceType {
+ ResourceType::Partition
+}
+
+fn consumer_group() -> ResourceType {
+ ResourceType::ConsumerGroup
+}
+
fn hot() -> ScenarioType {
ScenarioType::Hot
}
diff --git a/core/integration/tests/server/scenarios/concurrent_scenario.rs
b/core/integration/tests/server/scenarios/concurrent_scenario.rs
index 5140b42b6..d42296ba2 100644
--- a/core/integration/tests/server/scenarios/concurrent_scenario.rs
+++ b/core/integration/tests/server/scenarios/concurrent_scenario.rs
@@ -19,19 +19,16 @@
use crate::server::scenarios::create_client;
use futures::future::join_all;
use iggy::prelude::*;
-use iggy_common::UserInfo;
+use iggy_common::{ConsumerGroup, UserInfo};
use integration::test_server::{ClientFactory, login_root};
use std::sync::Arc;
use tokio::sync::Barrier;
-const OPERATIONS_COUNT: usize = 40;
-const MULTIPLE_CLIENT_COUNT: usize = 10;
-const OPERATIONS_PER_CLIENT: usize = OPERATIONS_COUNT / MULTIPLE_CLIENT_COUNT;
+const CONCURRENT_CLIENTS: usize = 20;
const USER_PASSWORD: &str = "secret";
const TEST_STREAM_NAME: &str = "race-test-stream";
const TEST_TOPIC_NAME: &str = "race-test-topic";
const PARTITIONS_COUNT: u32 = 1;
-const PARTITIONS_TO_ADD: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResourceType {
@@ -39,7 +36,7 @@ pub enum ResourceType {
Stream,
Topic,
Partition,
- // TODO(hubcio): add ConsumerGroup
+ ConsumerGroup,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -72,8 +69,8 @@ pub async fn run(
root_client.create_stream(TEST_STREAM_NAME).await.unwrap();
}
- // For partition tests, create parent stream and topic first
- if resource_type == ResourceType::Partition {
+ // For partition and consumer group tests, create parent stream and topic
first
+ if resource_type == ResourceType::Partition || resource_type ==
ResourceType::ConsumerGroup {
root_client.create_stream(TEST_STREAM_NAME).await.unwrap();
let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
root_client
@@ -91,29 +88,35 @@ pub async fn run(
}
let results = match (resource_type, scenario_type) {
- (ResourceType::User, ScenarioType::Cold) => {
- execute_multiple_clients_users_cold(client_factory,
use_barrier).await
- }
(ResourceType::User, ScenarioType::Hot) => {
- execute_multiple_clients_users_hot(client_factory,
use_barrier).await
+ execute_users_hot(client_factory, use_barrier).await
+ }
+ (ResourceType::User, ScenarioType::Cold) => {
+ execute_users_cold(client_factory, use_barrier).await
}
(ResourceType::Stream, ScenarioType::Hot) => {
- execute_multiple_clients_streams_hot(client_factory,
use_barrier).await
+ execute_streams_hot(client_factory, use_barrier).await
}
(ResourceType::Stream, ScenarioType::Cold) => {
- execute_multiple_clients_streams_cold(client_factory,
use_barrier).await
+ execute_streams_cold(client_factory, use_barrier).await
}
(ResourceType::Topic, ScenarioType::Hot) => {
- execute_multiple_clients_topics_hot(client_factory,
use_barrier).await
+ execute_topics_hot(client_factory, use_barrier).await
}
(ResourceType::Topic, ScenarioType::Cold) => {
- execute_multiple_clients_topics_cold(client_factory,
use_barrier).await
+ execute_topics_cold(client_factory, use_barrier).await
}
(ResourceType::Partition, ScenarioType::Hot) => {
- execute_multiple_clients_partitions_hot(client_factory,
use_barrier).await
+ execute_partitions_hot(client_factory, use_barrier).await
}
// Partitions don't have names, so Cold scenario doesn't apply
(ResourceType::Partition, ScenarioType::Cold) => vec![],
+ (ResourceType::ConsumerGroup, ScenarioType::Hot) => {
+ execute_consumer_groups_hot(client_factory, use_barrier).await
+ }
+ (ResourceType::ConsumerGroup, ScenarioType::Cold) => {
+ execute_consumer_groups_cold(client_factory, use_barrier).await
+ }
};
if !results.is_empty() {
@@ -123,320 +126,299 @@ pub async fn run(
}
}
-async fn execute_multiple_clients_users_hot(
+async fn execute_users_hot(
client_factory: &dyn ClientFactory,
use_barrier: bool,
) -> Vec<OperationResult> {
- let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
- let barrier = if use_barrier {
- Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
- } else {
- None
- };
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- for client_id in 0..MULTIPLE_CLIENT_COUNT {
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for client_id in 0..CONCURRENT_CLIENTS {
let client = create_client(client_factory).await;
login_root(&client).await;
+ let barrier = barrier.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- if let Some(b) = barrier_clone {
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
b.wait().await;
}
-
- let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
- for i in 0..OPERATIONS_PER_CLIENT {
- let username = format!("race-user-{}-{}", client_id, i);
- let result = client
- .create_user(&username, USER_PASSWORD, UserStatus::Active,
None)
- .await
- .map(|_| ());
- results.push(result);
- }
- results
- });
-
- handles.push(handle);
+ let username = format!("race-user-{}", client_id);
+ client
+ .create_user(&username, USER_PASSWORD, UserStatus::Active,
None)
+ .await
+ .map(|_| ())
+ }));
}
- let all_results = join_all(handles).await;
- all_results
+ join_all(handles)
+ .await
.into_iter()
- .flat_map(|r| r.expect("Tokio task panicked"))
+ .map(|r| r.expect("task panicked"))
.collect()
}
-async fn execute_multiple_clients_users_cold(
+async fn execute_users_cold(
client_factory: &dyn ClientFactory,
use_barrier: bool,
) -> Vec<OperationResult> {
- let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
const DUPLICATE_USER: &str = "race-user-duplicate";
- let barrier = if use_barrier {
- Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
- } else {
- None
- };
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- for _ in 0..MULTIPLE_CLIENT_COUNT {
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for _ in 0..CONCURRENT_CLIENTS {
let client = create_client(client_factory).await;
login_root(&client).await;
+ let barrier = barrier.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- if let Some(b) = barrier_clone {
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
b.wait().await;
}
-
- let mut results = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
- for _ in 0..OPERATIONS_PER_CLIENT {
- let result = client
- .create_user(DUPLICATE_USER, USER_PASSWORD,
UserStatus::Active, None)
- .await
- .map(|_| ());
- results.push(result);
- }
- results
- });
-
- handles.push(handle);
+ client
+ .create_user(DUPLICATE_USER, USER_PASSWORD,
UserStatus::Active, None)
+ .await
+ .map(|_| ())
+ }));
}
- let all_results = join_all(handles).await;
- all_results
+ join_all(handles)
+ .await
.into_iter()
- .flat_map(|r| r.expect("Tokio task panicked"))
+ .map(|r| r.expect("task panicked"))
.collect()
}
-async fn execute_multiple_clients_streams_hot(
+async fn execute_streams_hot(
client_factory: &dyn ClientFactory,
use_barrier: bool,
) -> Vec<OperationResult> {
- let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
- let barrier = if use_barrier {
- Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
- } else {
- None
- };
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- for client_id in 0..MULTIPLE_CLIENT_COUNT {
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for client_id in 0..CONCURRENT_CLIENTS {
let client = create_client(client_factory).await;
login_root(&client).await;
+ let barrier = barrier.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- if let Some(b) = barrier_clone {
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
b.wait().await;
}
-
- let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
- for i in 0..OPERATIONS_PER_CLIENT {
- let stream_name = format!("race-stream-{}-{}", client_id, i);
- let result = client.create_stream(&stream_name).await.map(|_|
());
- results.push(result);
- }
- results
- });
-
- handles.push(handle);
+ let stream_name = format!("race-stream-{}", client_id);
+ client.create_stream(&stream_name).await.map(|_| ())
+ }));
}
- let all_results = join_all(handles).await;
- all_results
+ join_all(handles)
+ .await
.into_iter()
- .flat_map(|r| r.expect("Tokio task panicked"))
+ .map(|r| r.expect("task panicked"))
.collect()
}
-async fn execute_multiple_clients_streams_cold(
+async fn execute_streams_cold(
client_factory: &dyn ClientFactory,
use_barrier: bool,
) -> Vec<OperationResult> {
- let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
const DUPLICATE_STREAM: &str = "race-stream-duplicate";
- let barrier = if use_barrier {
- Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
- } else {
- None
- };
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- for _ in 0..MULTIPLE_CLIENT_COUNT {
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for _ in 0..CONCURRENT_CLIENTS {
let client = create_client(client_factory).await;
login_root(&client).await;
+ let barrier = barrier.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- if let Some(b) = barrier_clone {
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
b.wait().await;
}
-
- let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
- for _ in 0..OPERATIONS_PER_CLIENT {
- let result =
client.create_stream(DUPLICATE_STREAM).await.map(|_| ());
- results.push(result);
- }
- results
- });
-
- handles.push(handle);
+ client.create_stream(DUPLICATE_STREAM).await.map(|_| ())
+ }));
}
- let all_results = join_all(handles).await;
- all_results
+ join_all(handles)
+ .await
.into_iter()
- .flat_map(|r| r.expect("Tokio task panicked"))
+ .map(|r| r.expect("task panicked"))
.collect()
}
-async fn execute_multiple_clients_topics_hot(
+async fn execute_topics_hot(
client_factory: &dyn ClientFactory,
use_barrier: bool,
) -> Vec<OperationResult> {
- let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
- let barrier = if use_barrier {
- Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
- } else {
- None
- };
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- for client_id in 0..MULTIPLE_CLIENT_COUNT {
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for client_id in 0..CONCURRENT_CLIENTS {
let client = create_client(client_factory).await;
login_root(&client).await;
+ let barrier = barrier.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- if let Some(b) = barrier_clone {
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
b.wait().await;
}
-
- let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
-
- for i in 0..OPERATIONS_PER_CLIENT {
- let topic_name = format!("race-topic-{}-{}", client_id, i);
- let result = client
- .create_topic(
- &stream_id,
- &topic_name,
- PARTITIONS_COUNT,
- CompressionAlgorithm::default(),
- None,
- IggyExpiry::NeverExpire,
- MaxTopicSize::ServerDefault,
- )
- .await
- .map(|_| ());
- results.push(result);
- }
- results
- });
-
- handles.push(handle);
+ let topic_name = format!("race-topic-{}", client_id);
+ client
+ .create_topic(
+ &stream_id,
+ &topic_name,
+ PARTITIONS_COUNT,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .map(|_| ())
+ }));
}
- let all_results = join_all(handles).await;
- all_results
+ join_all(handles)
+ .await
.into_iter()
- .flat_map(|r| r.expect("Tokio task panicked"))
+ .map(|r| r.expect("task panicked"))
.collect()
}
-async fn execute_multiple_clients_topics_cold(
+async fn execute_topics_cold(
client_factory: &dyn ClientFactory,
use_barrier: bool,
) -> Vec<OperationResult> {
- let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
const DUPLICATE_TOPIC: &str = "race-topic-duplicate";
- let barrier = if use_barrier {
- Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
- } else {
- None
- };
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- for _ in 0..MULTIPLE_CLIENT_COUNT {
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for _ in 0..CONCURRENT_CLIENTS {
let client = create_client(client_factory).await;
login_root(&client).await;
+ let barrier = barrier.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- if let Some(b) = barrier_clone {
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
b.wait().await;
}
-
- let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+ client
+ .create_topic(
+ &stream_id,
+ DUPLICATE_TOPIC,
+ PARTITIONS_COUNT,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .map(|_| ())
+ }));
+ }
- for _ in 0..OPERATIONS_PER_CLIENT {
- let result = client
- .create_topic(
- &stream_id,
- DUPLICATE_TOPIC,
- PARTITIONS_COUNT,
- CompressionAlgorithm::default(),
- None,
- IggyExpiry::NeverExpire,
- MaxTopicSize::ServerDefault,
- )
- .await
- .map(|_| ());
- results.push(result);
- }
- results
- });
+ join_all(handles)
+ .await
+ .into_iter()
+ .map(|r| r.expect("task panicked"))
+ .collect()
+}
- handles.push(handle);
+async fn execute_partitions_hot(
+ client_factory: &dyn ClientFactory,
+ use_barrier: bool,
+) -> Vec<OperationResult> {
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
+
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for _ in 0..CONCURRENT_CLIENTS {
+ let client = create_client(client_factory).await;
+ login_root(&client).await;
+ let barrier = barrier.clone();
+
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
+ b.wait().await;
+ }
+ let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+ let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+ client
+ .create_partitions(&stream_id, &topic_id, 1)
+ .await
+ .map(|_| ())
+ }));
}
- let all_results = join_all(handles).await;
- all_results
+ join_all(handles)
+ .await
.into_iter()
- .flat_map(|r| r.expect("Tokio task panicked"))
+ .map(|r| r.expect("task panicked"))
.collect()
}
-async fn execute_multiple_clients_partitions_hot(
+async fn execute_consumer_groups_hot(
client_factory: &dyn ClientFactory,
use_barrier: bool,
) -> Vec<OperationResult> {
- let mut handles = Vec::with_capacity(MULTIPLE_CLIENT_COUNT);
- let barrier = if use_barrier {
- Some(Arc::new(Barrier::new(MULTIPLE_CLIENT_COUNT)))
- } else {
- None
- };
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- for _ in 0..MULTIPLE_CLIENT_COUNT {
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for client_id in 0..CONCURRENT_CLIENTS {
let client = create_client(client_factory).await;
login_root(&client).await;
+ let barrier = barrier.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- if let Some(b) = barrier_clone {
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
b.wait().await;
}
-
- let mut results = Vec::with_capacity(OPERATIONS_PER_CLIENT);
let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+ let group_name = format!("race-consumer-group-{}", client_id);
+ client
+ .create_consumer_group(&stream_id, &topic_id, &group_name)
+ .await
+ .map(|_| ())
+ }));
+ }
- for _ in 0..OPERATIONS_PER_CLIENT {
- let result = client
- .create_partitions(&stream_id, &topic_id,
PARTITIONS_TO_ADD)
- .await
- .map(|_| ());
- results.push(result);
- }
- results
- });
+ join_all(handles)
+ .await
+ .into_iter()
+ .map(|r| r.expect("task panicked"))
+ .collect()
+}
+
+async fn execute_consumer_groups_cold(
+ client_factory: &dyn ClientFactory,
+ use_barrier: bool,
+) -> Vec<OperationResult> {
+ const DUPLICATE_CONSUMER_GROUP: &str = "race-consumer-group-duplicate";
+ let barrier = use_barrier.then(||
Arc::new(Barrier::new(CONCURRENT_CLIENTS)));
- handles.push(handle);
+ let mut handles = Vec::with_capacity(CONCURRENT_CLIENTS);
+ for _ in 0..CONCURRENT_CLIENTS {
+ let client = create_client(client_factory).await;
+ login_root(&client).await;
+ let barrier = barrier.clone();
+
+ handles.push(tokio::spawn(async move {
+ if let Some(b) = barrier {
+ b.wait().await;
+ }
+ let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+ let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+ client
+ .create_consumer_group(&stream_id, &topic_id,
DUPLICATE_CONSUMER_GROUP)
+ .await
+ .map(|_| ())
+ }));
}
- let all_results = join_all(handles).await;
- all_results
+ join_all(handles)
+ .await
.into_iter()
- .flat_map(|r| r.expect("Tokio task panicked"))
+ .map(|r| r.expect("task panicked"))
.collect()
}
@@ -449,9 +431,9 @@ fn validate_results(results: &[OperationResult],
scenario_type: ScenarioType) {
// Hot path: all operations should succeed
assert_eq!(
success_count,
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
"Hot path: Expected all {} operations to succeed, but only {}
succeeded. Errors: {:?}",
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
success_count,
results.iter().filter(|r| r.is_err()).collect::<Vec<_>>()
);
@@ -470,9 +452,9 @@ fn validate_results(results: &[OperationResult],
scenario_type: ScenarioType) {
);
assert_eq!(
error_count,
- OPERATIONS_COUNT - 1,
+ CONCURRENT_CLIENTS - 1,
"Cold path: Expected {} errors, but got {}",
- OPERATIONS_COUNT - 1,
+ CONCURRENT_CLIENTS - 1,
error_count
);
@@ -485,6 +467,7 @@ fn validate_results(results: &[OperationResult],
scenario_type: ScenarioType) {
IggyError::UserAlreadyExists
| IggyError::StreamNameAlreadyExists(_)
| IggyError::TopicNameAlreadyExists(_, _)
+ | IggyError::ConsumerGroupNameAlreadyExists(_, _)
| IggyError::HttpResponseError(400, _)
),
"Expected 'already exists' error, got: {:?}",
@@ -624,6 +607,36 @@ async fn validate_server_state(
);
}
}
+ ResourceType::ConsumerGroup => {
+ let mut group_states = Vec::new();
+ for client in &clients {
+ let state = validate_consumer_groups_state(client,
scenario_type).await;
+ group_states.push(state);
+ }
+
+ let first_state = &group_states[0];
+ for (i, state) in group_states.iter().enumerate().skip(1) {
+ assert_eq!(
+ state.len(),
+ first_state.len(),
+ "Client {} sees different number of consumer groups ({})
than client 0 ({})",
+ i,
+ state.len(),
+ first_state.len()
+ );
+
+ let mut first_names: Vec<_> = first_state.iter().map(|g|
&g.name).collect();
+ let mut current_names: Vec<_> = state.iter().map(|g|
&g.name).collect();
+ first_names.sort();
+ current_names.sort();
+
+ assert_eq!(
+ current_names, first_names,
+ "Client {} sees different consumer groups than client 0",
+ i
+ );
+ }
+ }
}
}
@@ -636,12 +649,12 @@ async fn validate_users_state(client: &IggyClient,
scenario_type: ScenarioType)
match scenario_type {
ScenarioType::Hot => {
- // Hot path: should have OPERATIONS_COUNT unique users
+ // Hot path: should have CONCURRENT_CLIENTS unique users
assert_eq!(
test_users.len(),
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
"Hot path: Expected {} users, but found {}. Users: {:?}",
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
test_users.len(),
test_users.iter().map(|u| &u.username).collect::<Vec<_>>()
);
@@ -697,12 +710,12 @@ async fn validate_streams_state(client: &IggyClient,
scenario_type: ScenarioType
match scenario_type {
ScenarioType::Hot => {
- // Hot path: should have OPERATIONS_COUNT unique streams
+ // Hot path: should have CONCURRENT_CLIENTS unique streams
assert_eq!(
test_streams.len(),
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
"Hot path: Expected {} streams, but found {}. Streams: {:?}",
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
test_streams.len(),
test_streams.iter().map(|s| &s.name).collect::<Vec<_>>()
);
@@ -765,12 +778,12 @@ async fn validate_topics_state(client: &IggyClient,
scenario_type: ScenarioType)
match scenario_type {
ScenarioType::Hot => {
- // Hot path: should have OPERATIONS_COUNT unique topics
+ // Hot path: should have CONCURRENT_CLIENTS unique topics
assert_eq!(
test_topics.len(),
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
"Hot path: Expected {} topics, but found {}. Topics: {:?}",
- OPERATIONS_COUNT,
+ CONCURRENT_CLIENTS,
test_topics.len(),
test_topics.iter().map(|t| &t.name).collect::<Vec<_>>()
);
@@ -826,17 +839,82 @@ async fn validate_partitions_state(client: &IggyClient)
-> u32 {
.expect("Failed to get test topic")
.expect("Test topic not found");
- // Expected: initial partition + OPERATIONS_COUNT added partitions
- let expected_partitions = PARTITIONS_COUNT + (OPERATIONS_COUNT as u32 *
PARTITIONS_TO_ADD);
+ // Expected: initial partition + CONCURRENT_CLIENTS added partitions (1
each)
+ let expected_partitions = PARTITIONS_COUNT + CONCURRENT_CLIENTS as u32;
assert_eq!(
topic.partitions_count, expected_partitions,
- "Hot path: Expected {} partitions (1 initial + {} added), but found
{}",
- expected_partitions, OPERATIONS_COUNT, topic.partitions_count
+ "Expected {} partitions (1 initial + {} added), but found {}",
+ expected_partitions, CONCURRENT_CLIENTS, topic.partitions_count
);
topic.partitions_count
}
+async fn validate_consumer_groups_state(
+ client: &IggyClient,
+ scenario_type: ScenarioType,
+) -> Vec<ConsumerGroup> {
+ let stream_id = Identifier::named(TEST_STREAM_NAME).unwrap();
+ let topic_id = Identifier::named(TEST_TOPIC_NAME).unwrap();
+ let groups = client
+ .get_consumer_groups(&stream_id, &topic_id)
+ .await
+ .expect("Failed to get consumer groups");
+ let test_groups: Vec<_> = groups
+ .into_iter()
+ .filter(|g| g.name.starts_with("race-consumer-group-"))
+ .collect();
+
+ match scenario_type {
+ ScenarioType::Hot => {
+ assert_eq!(
+ test_groups.len(),
+ CONCURRENT_CLIENTS,
+ "Hot path: Expected {} consumer groups, but found {}. Groups:
{:?}",
+ CONCURRENT_CLIENTS,
+ test_groups.len(),
+ test_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
+ );
+
+ let mut ids: Vec<u32> = test_groups.iter().map(|g| g.id).collect();
+ ids.sort_unstable();
+ let unique_ids: std::collections::HashSet<u32> =
ids.iter().copied().collect();
+ assert_eq!(
+ unique_ids.len(),
+ test_groups.len(),
+ "Hot path: Found duplicate consumer group IDs: {:?}",
+ ids
+ );
+
+ let names: std::collections::HashSet<&str> =
+ test_groups.iter().map(|g| g.name.as_str()).collect();
+ assert_eq!(
+ names.len(),
+ test_groups.len(),
+ "Hot path: Found duplicate consumer group names"
+ );
+ }
+ ScenarioType::Cold => {
+ assert_eq!(
+ test_groups.len(),
+ 1,
+ "Cold path: Expected exactly 1 consumer group, but found {}.
Groups: {:?}",
+ test_groups.len(),
+ test_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
+ );
+
+ let group = &test_groups[0];
+ assert_eq!(
+ group.name, "race-consumer-group-duplicate",
+ "Cold path: Expected consumer group named
'race-consumer-group-duplicate', found '{}'",
+ group.name
+ );
+ }
+ }
+
+ test_groups
+}
+
async fn cleanup_resources(client: &IggyClient, resource_type: ResourceType) {
match resource_type {
ResourceType::User => {
@@ -852,13 +930,15 @@ async fn cleanup_resources(client: &IggyClient,
resource_type: ResourceType) {
ResourceType::Stream => {
let streams = client.get_streams().await.unwrap();
for stream in streams {
- let _ = client
- .delete_stream(&Identifier::numeric(stream.id).unwrap())
- .await;
+ if stream.name.starts_with("race-stream-") {
+ let _ = client
+
.delete_stream(&Identifier::numeric(stream.id).unwrap())
+ .await;
+ }
}
}
- // Just delete test stream (also cleans up topics and partitions)
- ResourceType::Topic | ResourceType::Partition => {
+ // Just delete test stream (also cleans up topics, partitions, and
consumer groups)
+ ResourceType::Topic | ResourceType::Partition |
ResourceType::ConsumerGroup => {
let _ = client
.delete_stream(&Identifier::named(TEST_STREAM_NAME).unwrap())
.await;