This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch collaborative_rebalancing in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f515dc5b7158e5aa4a5033b0729e5859be51de4a Author: spetz <[email protected]> AuthorDate: Fri Feb 13 12:44:48 2026 +0100 cleanup --- .../tests/connectors/postgres/postgres_source.rs | 2 +- .../verify_consumer_group_partition_assignment.rs | 1589 +++++++++++++++++++- core/server/src/bootstrap.rs | 1 - core/server/src/metadata/absorb.rs | 20 +- core/server/src/metadata/consumer_group.rs | 42 +- core/server/src/metadata/consumer_group_member.rs | 1 + core/server/src/metadata/inner.rs | 4 - core/server/src/metadata/ops.rs | 1 + core/server/src/metadata/reader.rs | 103 +- core/server/src/metadata/writer.rs | 3 + core/server/src/shard/handlers.rs | 2 + core/server/src/shard/system/consumer_groups.rs | 1 + core/server/src/shard/system/consumer_offsets.rs | 9 +- core/server/src/shard/system/messages.rs | 4 +- .../src/shard/tasks/periodic/revocation_timeout.rs | 1 + core/server/src/shard/transmission/message.rs | 1 + 16 files changed, 1648 insertions(+), 136 deletions(-) diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs b/core/integration/tests/connectors/postgres/postgres_source.rs index de8bc391a..5d322df36 100644 --- a/core/integration/tests/connectors/postgres/postgres_source.rs +++ b/core/integration/tests/connectors/postgres/postgres_source.rs @@ -497,7 +497,7 @@ async fn state_persists_across_connector_restart( .start_dependents() .await .expect("Failed to restart connectors"); - sleep(Duration::from_millis(500)).await; + sleep(Duration::from_secs(2)).await; let mut received_after: Vec<DatabaseRecord> = Vec::new(); for _ in 0..POLL_ATTEMPTS { diff --git a/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs index fda75f8da..72d5d38a4 100644 --- a/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs +++ b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs @@ -21,6 +21,7 @@ use integration::iggy_harness; use std::collections::HashSet; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use tokio::time::{Duration, sleep}; const STREAM_NAME: &str = "cg-partition-test-stream"; @@ -326,6 +327,1371 @@ async fn should_not_reshuffle_partitions_when_new_member_joins(harness: &TestHar .unwrap(); } +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_lose_messages_with_concurrent_polls_during_partition_add( + harness: &TestHarness, +) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + + // Send 10 messages per partition + for partition_id in 0..PARTITIONS_COUNT { + for i in 0..10u32 { + let message = IggyMessage::from_str(&format!("p{partition_id}-msg{i}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + } + + // 1. Two consumers join + let client1 = Arc::new(harness.new_client().await.unwrap()); + let client2 = Arc::new(harness.new_client().await.unwrap()); + for client in [&*client1, &*client2] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + sleep(Duration::from_millis(200)).await; + + // 2. Start concurrent poll loops + let stop = Arc::new(AtomicBool::new(false)); + let total_polled_1 = Arc::new(AtomicU64::new(0)); + let total_polled_2 = Arc::new(AtomicU64::new(0)); + + let poll_task_1 = { + let client = client1.clone(); + let stop = stop.clone(); + let count = total_polled_1.clone(); + tokio::spawn(async move { + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + while !stop.load(Ordering::Relaxed) { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await; + if let Ok(polled) = polled { + count.fetch_add(polled.messages.len() as u64, Ordering::Relaxed); + } + sleep(Duration::from_millis(10)).await; + } + }) + }; + + let poll_task_2 = { + let client = client2.clone(); + let stop = stop.clone(); + let count = total_polled_2.clone(); + tokio::spawn(async move { + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + while !stop.load(Ordering::Relaxed) { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await; + if let Ok(polled) = polled { + count.fetch_add(polled.messages.len() as u64, Ordering::Relaxed); + } + sleep(Duration::from_millis(10)).await; + } + }) + }; + + // 3. While polling, add 3 partitions + sleep(Duration::from_millis(100)).await; + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + // Send messages to new partitions + for partition_id in PARTITIONS_COUNT..PARTITIONS_COUNT + 3 { + for i in 0..10u32 { + let message = IggyMessage::from_str(&format!("p{partition_id}-msg{i}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + } + + // Let polling continue for a bit after partition add + sleep(Duration::from_secs(1)).await; + stop.store(true, Ordering::Relaxed); + let _ = poll_task_1.await; + let _ = poll_task_2.await; + + // 4. Verify: no duplicates, all partitions assigned + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 6, + "All 6 partitions must be assigned. Members: {:?}", + cg.members + ); + + // Both consumers should have polled something + let p1 = total_polled_1.load(Ordering::Relaxed); + let p2 = total_polled_2.load(Ordering::Relaxed); + assert!( + p1 + p2 > 0, + "Consumers should have polled at least some messages" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_handle_partition_add_then_consumer_disconnect_then_new_join(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Two consumers join + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + for client in [&client1, &client2] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + sleep(Duration::from_millis(200)).await; + + // Both poll and auto-commit + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + for client in [&client1, &client2] { + let _ = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await + .unwrap(); + } + + // 2. Add 3 partitions (3 → 6) + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + for partition_id in PARTITIONS_COUNT..PARTITIONS_COUNT + 3 { + let message = IggyMessage::from_str(&format!("new-p{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + sleep(Duration::from_millis(500)).await; + + // 3. Consumer2 disconnects + drop(client2); + sleep(Duration::from_millis(500)).await; + + let cg = get_consumer_group(&root_client).await; + assert_eq!( + cg.members_count, 1, + "Only consumer1 should remain. Members: {:?}", + cg.members + ); + assert_eq!( + cg.members[0].partitions_count, 6, + "Consumer1 should have all 6 partitions after consumer2 left. Members: {:?}", + cg.members + ); + + // 4. New consumer3 joins + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + sleep(Duration::from_millis(500)).await; + + // 5. Verify: both members have partitions, no duplicates + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 6, + "All 6 partitions must be assigned. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert!( + member.partitions_count >= 1, + "Both members must have partitions. Members: {:?}", + cg.members + ); + } + + // 6. Both can poll unique partitions + let mut polled_partitions = HashSet::new(); + for client in [&client1, &client3] { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + assert!( + polled_partitions.insert(polled.partition_id), + "Duplicate partition {} after disconnect + rejoin!", + polled.partition_id + ); + } + } + assert_eq!( + polled_partitions.len(), + 2, + "Both consumers should poll from unique partitions" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_handle_partition_delete_while_multiple_consumers_polling(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + // Start with 6 partitions + root_client.create_stream(STREAM_NAME).await.unwrap(); + root_client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 6, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + root_client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); + + // Send messages to all 6 partitions + for partition_id in 0..6u32 { + for i in 0..5u32 { + let message = IggyMessage::from_str(&format!("p{partition_id}-m{i}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + } + + // 1. Three consumers join (6 partitions / 3 = 2 each) + let client1 = Arc::new(harness.new_client().await.unwrap()); + let client2 = Arc::new(harness.new_client().await.unwrap()); + let client3 = Arc::new(harness.new_client().await.unwrap()); + for client in [&*client1, &*client2, &*client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + sleep(Duration::from_millis(200)).await; + + // 2. Start concurrent poll loops + let stop = Arc::new(AtomicBool::new(false)); + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + + let mut poll_tasks = Vec::new(); + for client in [client1.clone(), client2.clone(), client3.clone()] { + let stop = stop.clone(); + let consumer = consumer.clone(); + poll_tasks.push(tokio::spawn(async move { + let mut count = 0u64; + while !stop.load(Ordering::Relaxed) { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await; + if let Ok(p) = polled { + count += p.messages.len() as u64; + } + sleep(Duration::from_millis(10)).await; + } + count + })); + } + + // 3. While all 3 consumers are polling, delete 3 partitions (6 → 3) + sleep(Duration::from_millis(200)).await; + root_client + .delete_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + // Let polling continue after delete + sleep(Duration::from_secs(1)).await; + stop.store(true, Ordering::Relaxed); + for task in poll_tasks { + let _ = task.await; + } + + // 4. Verify: 3 members, 3 remaining partitions, no duplicates + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 3, + "Only 3 partitions should remain. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert_eq!( + member.partitions_count, 1, + "Each of 3 members should have exactly 1 of 3 remaining partitions. Members: {:?}", + cg.members + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_reach_even_distribution_after_multiple_joins(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + // Start with 6 partitions for cleaner math + root_client.create_stream(STREAM_NAME).await.unwrap(); + root_client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 6, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + root_client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); + + for partition_id in 0..6u32 { + let message = IggyMessage::from_str(&format!("msg-p{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + // 1. Consumer1 joins — gets all 6 partitions + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + sleep(Duration::from_millis(100)).await; + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members[0].partitions_count, 6); + + // 2. Consumer2 joins — cooperative rebalance moves some partitions + // (never polled → immediate completion) + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + sleep(Duration::from_millis(200)).await; + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!(total, 6); + + // 3. Consumer3 joins — another round of cooperative rebalance + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + sleep(Duration::from_millis(200)).await; + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!(total, 6); + for member in &cg.members { + assert!( + member.partitions_count >= 1, + "Every member must have at least 1 partition. Members: {:?}", + cg.members + ); + } + + // 4. Consumer4 joins — with 6 partitions and 4 members, cooperative rebalance + // should give member4 at least 1 partition (fair_share=1, remainder=2) + let client4 = harness.new_client().await.unwrap(); + client4 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client4).await; + + sleep(Duration::from_millis(200)).await; + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 4); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!(total, 6); + for member in &cg.members { + assert!( + member.partitions_count >= 1, + "Every member must have at least 1 partition with 4 members and 6 partitions. Members: {:?}", + cg.members + ); + } + + // 5. Consumer5 + Consumer6 join — 6 partitions, 6 consumers → exactly 1 each + let client5 = harness.new_client().await.unwrap(); + let client6 = harness.new_client().await.unwrap(); + for client in [&client5, &client6] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + sleep(Duration::from_millis(200)).await; + } + + sleep(Duration::from_millis(200)).await; + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 6); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!(total, 6); + for member in &cg.members { + assert_eq!( + member.partitions_count, 1, + "With 6 members and 6 partitions, each must have exactly 1. Members: {:?}", + cg.members + ); + } + + // 6. All 6 consumers poll — must get unique partitions + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut polled_partitions = HashSet::new(); + for client in [&client1, &client2, &client3, &client4, &client5, &client6] { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + assert!( + polled_partitions.insert(polled.partition_id), + "Duplicate partition {} with 6 consumers on 6 partitions!", + polled.partition_id + ); + } + } + assert_eq!( + polled_partitions.len(), + 6, + "All 6 partitions must be covered by 6 consumers" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_split_evenly_when_consumer_joins_after_partitions_added(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Two consumers join, split 3 partitions (2+1) + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + for client in [&client1, &client2] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + sleep(Duration::from_millis(200)).await; + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!(total, 3); + + // 2. Add 3 partitions (3 -> 6), triggers full rebalance (3 each) + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(500)).await; + + // 3. Send messages to new partitions + for partition_id in PARTITIONS_COUNT..PARTITIONS_COUNT + 3 { + let message = + IggyMessage::from_str(&format!("message-new-partition-{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + // 4. Consumer3 joins — now 3 consumers, 6 partitions → 2 each + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + sleep(Duration::from_millis(500)).await; + + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 6, + "All 6 partitions must be assigned. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert!( + member.partitions_count >= 1, + "Every member must have at least 1 partition after cooperative rebalance. Members: {:?}", + cg.members + ); + } + + // 5. Each consumer polls — must get unique partitions, no duplicates + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut polled_partitions = HashSet::new(); + for client in [&client1, &client2, &client3] { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + assert!( + polled_partitions.insert(polled.partition_id), + "Duplicate partition {} polled by multiple consumers!", + polled.partition_id + ); + } + } + assert_eq!( + polled_partitions.len(), + 3, + "All 3 consumers should poll from unique partitions" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_duplicate_messages_when_partitions_added_during_polling(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Two consumers join and start polling + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + for client in [&client1, &client2] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + + // 2. Both consumers poll without committing (in-flight work) + for client in [&client1, &client2] { + let _ = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + } + + // 3. Add partitions WHILE consumers are active with polled-but-uncommitted data + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(500)).await; + + // 4. After rebalance, verify no partition is assigned to both consumers + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 6, + "All 6 partitions must be assigned. Members: {:?}", + cg.members + ); + + // 5. Both consumers poll again — must not get same partition + let mut seen_by_client: Vec<HashSet<u32>> = Vec::new(); + for client in [&client1, &client2] { + let mut client_partitions = HashSet::new(); + for _ in 0..3 { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + client_partitions.insert(polled.partition_id); + } + } + seen_by_client.push(client_partitions); + } + let overlap: HashSet<_> = seen_by_client[0].intersection(&seen_by_client[1]).collect(); + assert!( + overlap.is_empty(), + "Consumers polled overlapping partitions after rebalance: {:?}", + overlap + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_handle_delete_partitions_with_uncommitted_work(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + // Create with 6 partitions + root_client.create_stream(STREAM_NAME).await.unwrap(); + root_client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 6, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + root_client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); + + // Send messages to all 6 partitions + for partition_id in 0..6u32 { + let message = IggyMessage::from_str(&format!("msg-{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + // 1. Two consumers join, each gets 3 partitions + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + for client in [&client1, &client2] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + // 2. Both consumers poll without committing + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + for client in [&client1, &client2] { + let _ = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + } + + // 3. Delete 3 partitions while consumers have uncommitted work + root_client + .delete_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(500)).await; + + // 4. After rebalance: 3 remaining partitions, no duplicates, even split + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 3, + "3 remaining partitions must be assigned. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert!( + member.partitions_count > 0, + "Every member should have at least 1 partition. Members: {:?}", + cg.members + ); + } + + // 5. Consumers can still poll from remaining partitions without error + for client in [&client1, &client2] { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + assert!( + !polled.messages.is_empty(), + "Consumer should still be able to poll from remaining partitions" + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_handle_rapid_partition_changes_with_active_consumers(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Two consumers join + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + for client in [&client1, &client2] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + sleep(Duration::from_millis(200)).await; + + // 2. Add partitions, then add more, then consumer3 joins — rapid changes + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + // Send messages to new partitions + for partition_id in 3..6u32 { + let message = IggyMessage::from_str(&format!("msg-{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + // Send messages to newest partitions + for partition_id in 6..9u32 { + let message = IggyMessage::from_str(&format!("msg-{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + sleep(Duration::from_millis(500)).await; + + // 3. 9 partitions, 3 consumers → 3 each + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 9, + "All 9 partitions must be assigned. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert!( + member.partitions_count >= 1, + "Every member must have at least 1 partition after cooperative rebalance. Members: {:?}", + cg.members + ); + } + + // 4. All consumers poll — unique partitions only + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut polled_partitions = HashSet::new(); + for client in [&client1, &client2, &client3] { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + assert!( + polled_partitions.insert(polled.partition_id), + "Duplicate partition {} after rapid changes!", + polled.partition_id + ); + } + } + assert_eq!( + polled_partitions.len(), + 3, + "All 3 consumers should poll from unique partitions" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_rebalance_after_adding_partitions(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Three consumers join, each gets 1 of 3 partitions + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + let client3 = harness.new_client().await.unwrap(); + for client in [&client1, &client2, &client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + sleep(Duration::from_millis(200)).await; + + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + for member in &cg.members { + assert_eq!( + member.partitions_count, 1, + "Each member should have 1 partition before adding. Members: {:?}", + cg.members + ); + } + + // 2. Add 3 more partitions (3 -> 6 total) + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(500)).await; + + // 3. After rebalance, each consumer should have 2 partitions (6/3) + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 6, + "All 6 partitions must be assigned. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert_eq!( + member.partitions_count, 2, + "Each member should have 2 partitions after adding 3 more. Members: {:?}", + cg.members + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_rebalance_after_deleting_partitions(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + // Create with 6 partitions + root_client.create_stream(STREAM_NAME).await.unwrap(); + root_client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 6, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + root_client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); + + // 1. Three consumers join, each gets 2 of 6 partitions + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + let client3 = harness.new_client().await.unwrap(); + for client in [&client1, &client2, &client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + sleep(Duration::from_millis(200)).await; + + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!(total, 6); + + // 2. Delete 3 partitions (6 -> 3) + root_client + .delete_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(500)).await; + + // 3. After rebalance, each consumer should have 1 partition (3/3) + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 3, + "All 3 remaining partitions must be assigned. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert_eq!( + member.partitions_count, 1, + "Each member should have 1 partition after deleting 3. Members: {:?}", + cg.members + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_handle_partition_add_during_pending_revocation(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Consumer1 joins, polls all 3 partitions without committing + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + for _ in 0..PARTITIONS_COUNT { + client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + } + + // 2. Consumer2 joins — triggers pending revocations + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + sleep(Duration::from_millis(100)).await; + + // 3. Add partitions WHILE revocations are pending + root_client + .create_partitions( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + 3, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(500)).await; + + // 4. After full rebalance triggered by partition change, + // all 6 partitions should be distributed, no duplicates + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, 6, + "All 6 partitions must be assigned after add during pending revocation. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert_eq!( + member.partitions_count, 3, + "Each of 2 members should have 3 of 6 partitions. Members: {:?}", + cg.members + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + #[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( consumer_group.rebalancing_timeout = "3s", consumer_group.rebalancing_check_interval = "1s" @@ -750,11 +2116,9 @@ async fn get_consumer_group(client: &IggyClient) -> ConsumerGroupDetails { fn assert_unique_partition_assignments(cg: &ConsumerGroupDetails) { let mut all_partitions = HashSet::new(); - let mut total_assigned = 0u32; for member in &cg.members { for &partition in &member.partitions { - total_assigned += 1; assert!( all_partitions.insert(partition), "Partition {partition} is assigned to multiple members! \ @@ -764,13 +2128,6 @@ fn assert_unique_partition_assignments(cg: &ConsumerGroupDetails) { ); } } - - assert_eq!( - total_assigned, PARTITIONS_COUNT, - "Expected {PARTITIONS_COUNT} total partition assignments, got {total_assigned}. \ - Members: {:?}", - cg.members - ); } async fn setup_stream_topic_cg(client: &IggyClient) { @@ -1614,3 +2971,217 @@ async fn should_distribute_partitions_evenly_with_concurrent_joins(harness: &Tes .await .unwrap(); } + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_assign_partition_to_wrong_member_after_slab_reuse(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Consumer1 joins, gets all 3 partitions, polls without committing + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + for _ in 0..PARTITIONS_COUNT { + client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + } + + // 2. Consumer2 joins — triggers pending revocation targeting consumer2's slab + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + sleep(Duration::from_millis(100)).await; + + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + + // 3. Consumer2 (revocation target) disconnects — its slab is freed + drop(client2); + sleep(Duration::from_secs(3)).await; + + // 4. Consumer3 joins — may reuse consumer2's old slab + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + // 5. Consumer1 commits all offsets — revocation completion should detect + // slab reuse via target_member_id validation and trigger full rebalance + for partition_id in 0..PARTITIONS_COUNT { + client1 + .store_consumer_offset( + &consumer, + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(partition_id), + 0, + ) + .await + .unwrap(); + } + + sleep(Duration::from_millis(500)).await; + + // 6. Verify no partition duplication and all partitions assigned + let cg = get_consumer_group(&root_client).await; + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, PARTITIONS_COUNT, + "All partitions must be assigned after slab reuse scenario. Members: {:?}", + cg.members + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_complete_other_members_revocations_on_leave(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Consumer1 joins, gets all 3 partitions, polls without committing + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + for _ in 0..PARTITIONS_COUNT { + client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + } + + // 2. Consumer2 joins — triggers pending revocation for 1 partition + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + // 3. Consumer3 joins — triggers another pending revocation + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + sleep(Duration::from_millis(100)).await; + + // Consumer1 has in-flight work on all partitions (polled, not committed). + // Consumer2 and consumer3 have 0 partitions (waiting for revocations). + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + + // 4. Consumer3 disconnects — triggers full rebalance (rebalance_members). + // Leave clears only consumer3's polled offsets, not consumer1's. + drop(client3); + sleep(Duration::from_secs(3)).await; + + // 5. After full rebalance, consumer1 and consumer2 split all partitions. + // The key invariant: no partition is assigned to two members. + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum(); + assert_eq!( + total, PARTITIONS_COUNT, + "All partitions must be assigned after leave. Members: {:?}", + cg.members + ); + for member in &cg.members { + assert!( + member.partitions_count > 0, + "Both remaining members should have partitions after leave rebalance. Members: {:?}", + cg.members + ); + } + + // 6. Both consumers can poll without duplicates + let mut all_partitions = HashSet::new(); + for client in [&client1, &client2] { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + assert!( + all_partitions.insert(polled.partition_id), + "Duplicate partition {} after leave rebalance", + polled.partition_id + ); + } + } + assert_eq!( + all_partitions.len(), + 2, + "Both consumers should poll from unique partitions after leave" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index ea10b1816..3d95fb9c3 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -657,7 +657,6 @@ pub fn build_inner_metadata( users_can_send_all_streams: Default::default(), users_can_poll_stream: Default::default(), users_can_send_stream: Default::default(), - pending_revocations_count: Default::default(), } } diff --git a/core/server/src/metadata/absorb.rs b/core/server/src/metadata/absorb.rs index ef224a0c3..a90734f2a 100644 --- a/core/server/src/metadata/absorb.rs +++ b/core/server/src/metadata/absorb.rs @@ -309,7 +309,7 @@ fn apply_op( if !group.members.iter().any(|(_, m)| m.client_id == *client_id) { let new_member = ConsumerGroupMemberMeta::new(next_id, *client_id); group.members.insert(new_member); - group.assign_partitions_to_new_members(); + group.rebalance_cooperative(); } if populate_ids { @@ -343,13 +343,23 @@ fn apply_op( if populate_ids { removed_member_id.store(member_id, Ordering::Release); } + // Partitions owned by the leaving member + let leaving_partitions: Vec<usize> = group + .members + .get(member_id) + .map(|m| m.partitions.clone()) + .unwrap_or_default(); + group.members.remove(member_id); group.rebalance_members(); + // Clear polled offsets only for the leaving member's partitions let consumer_group_id = ConsumerGroupId(*group_id); - for partition in topic.partitions.iter() { - let guard = partition.last_polled_offsets.pin(); - guard.remove(&consumer_group_id); + for partition_id in leaving_partitions { + if let Some(partition) = topic.partitions.get(partition_id) { + let guard = partition.last_polled_offsets.pin(); + guard.remove(&consumer_group_id); + } } } } @@ -388,11 +398,13 @@ fn apply_op( member_slab_id, member_id, partition_id, + timed_out: _, } => { if let Some(stream) = metadata.streams.get_mut(*stream_id) && let Some(topic) = stream.topics.get_mut(*topic_id) && let Some(group) = topic.consumer_groups.get_mut(*group_id) { + // Pre-validated by maybe_complete_pending_revocation before dispatch. group.complete_revocation(*member_slab_id, *member_id, *partition_id); } } diff --git a/core/server/src/metadata/consumer_group.rs b/core/server/src/metadata/consumer_group.rs index d26734fc1..c29a82e37 100644 --- a/core/server/src/metadata/consumer_group.rs +++ b/core/server/src/metadata/consumer_group.rs @@ -26,6 +26,7 @@ use slab::Slab; use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::Ordering; +use tracing::warn; #[derive(Clone, Debug)] pub struct ConsumerGroupMeta { @@ -66,8 +67,9 @@ impl ConsumerGroupMeta { } } - /// Incremental partition assignment for a new member join. - pub fn assign_partitions_to_new_members(&mut self) { + /// Cooperative rebalance: assign unassigned partitions to idle members and + /// mark excess partitions on over-assigned members as pending revocation. + pub fn rebalance_cooperative(&mut self) { let member_count = self.members.len(); if member_count == 0 || self.partitions.is_empty() { return; @@ -200,10 +202,16 @@ impl ConsumerGroupMeta { break; } let idle_id = idle_slab_ids.remove(0); + let target_member_id = self + .members + .get(idle_id) + .map(|m| m.id) + .unwrap_or(usize::MAX); if let Some(member) = self.members.get_mut(mid) { member.pending_revocations.push(PendingRevocation { partition_id, target_slab_id: idle_id, + target_member_id, created_at_micros: IggyTimestamp::now().as_micros(), }); } @@ -226,7 +234,7 @@ impl ConsumerGroupMeta { }; let last_polled = { let guard = partition.last_polled_offsets.pin(); - guard.get(&cg_id).map(|v| v.load(Ordering::Relaxed)) + guard.get(&cg_id).map(|v| v.load(Ordering::Acquire)) }; let can_complete = match last_polled { None => true, @@ -234,7 +242,7 @@ impl ConsumerGroupMeta { let offsets_guard = partition.consumer_group_offsets.pin(); offsets_guard .get(&cg_id) - .map(|offset| offset.offset.load(Ordering::Relaxed)) + .map(|offset| offset.offset.load(Ordering::Acquire)) .is_some_and(|committed| committed >= polled) } }; @@ -257,8 +265,12 @@ impl ConsumerGroupMeta { member_id: usize, partition_id: PartitionId, ) -> bool { - let target_slab_id = if let Some(member) = self.members.get_mut(member_slab_id) { + let target_info = if let Some(member) = self.members.get_mut(member_slab_id) { if member.id != member_id { + warn!( + "Revocation rejected: member ID mismatch (slab={member_slab_id}, expected={member_id}, actual={})", + member.id + ); return false; } let pos = member @@ -267,21 +279,33 @@ impl ConsumerGroupMeta { .position(|revocation| revocation.partition_id == partition_id); if let Some(pos) = pos { let removed = member.pending_revocations.remove(pos); - let target = removed.target_slab_id; member.partitions.retain(|&p| p != partition_id); - Some(target) + Some((removed.target_slab_id, removed.target_member_id)) } else { + warn!( + "Revocation rejected: no pending revocation for partition={partition_id} on slab={member_slab_id}" + ); None } } else { + warn!("Revocation rejected: source slab={member_slab_id} not found"); None }; - if let Some(target) = target_slab_id { - if let Some(target_member) = self.members.get_mut(target) { + if let Some((target_slab, expected_target_id)) = target_info { + if let Some(target_member) = self.members.get_mut(target_slab) { + if target_member.id != expected_target_id { + warn!( + "Revocation target slab={target_slab} reused (expected={expected_target_id}, actual={}), full rebalance", + target_member.id + ); + self.rebalance_members(); + return true; + } target_member.partitions.push(partition_id); return true; } + warn!("Revocation target slab={target_slab} gone, full rebalance"); self.rebalance_members(); return true; } diff --git a/core/server/src/metadata/consumer_group_member.rs b/core/server/src/metadata/consumer_group_member.rs index d7c7486c1..7605de0cc 100644 --- a/core/server/src/metadata/consumer_group_member.rs +++ b/core/server/src/metadata/consumer_group_member.rs @@ -24,6 +24,7 @@ use std::sync::atomic::AtomicUsize; pub struct PendingRevocation { pub partition_id: PartitionId, pub target_slab_id: usize, + pub target_member_id: usize, pub created_at_micros: u64, } diff --git a/core/server/src/metadata/inner.rs b/core/server/src/metadata/inner.rs index 36195fb5f..4bcc165f5 100644 --- a/core/server/src/metadata/inner.rs +++ b/core/server/src/metadata/inner.rs @@ -20,7 +20,6 @@ use ahash::{AHashMap, AHashSet}; use iggy_common::{GlobalPermissions, PersonalAccessToken, StreamPermissions}; use slab::Slab; use std::sync::Arc; -use std::sync::atomic::AtomicUsize; #[derive(Clone, Default)] pub struct InnerMetadata { @@ -46,9 +45,6 @@ pub struct InnerMetadata { pub users_can_send_all_streams: AHashSet<UserId>, pub users_can_poll_stream: AHashSet<(UserId, StreamId)>, pub users_can_send_stream: AHashSet<(UserId, StreamId)>, - - /// Global pending revocation count, shared across left-right copies. - pub pending_revocations_count: Arc<AtomicUsize>, } impl InnerMetadata { diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs index e9e38148d..a6d80c237 100644 --- a/core/server/src/metadata/ops.rs +++ b/core/server/src/metadata/ops.rs @@ -129,5 +129,6 @@ pub enum MetadataOp { member_slab_id: usize, member_id: usize, partition_id: PartitionId, + timed_out: bool, }, } diff --git a/core/server/src/metadata/reader.rs b/core/server/src/metadata/reader.rs index dfcd0c0ef..ba60e177f 100644 --- a/core/server/src/metadata/reader.rs +++ b/core/server/src/metadata/reader.rs @@ -253,83 +253,6 @@ impl Metadata { Some(current % partitions_count) } - pub fn get_next_member_partition_id( - &self, - stream_id: StreamId, - topic_id: TopicId, - group_id: ConsumerGroupId, - member_id: usize, - calculate: bool, - ) -> Option<PartitionId> { - let metadata = self.load(); - let member = metadata - .streams - .get(stream_id)? - .topics - .get(topic_id)? - .consumer_groups - .get(group_id)? - .members - .get(member_id)?; - - // Fast path: no pending revocations - if member.pending_revocations.is_empty() { - let partitions = &member.partitions; - let count = partitions.len(); - if count == 0 { - return None; - } - let counter = &member.partition_index; - return if calculate { - let current = counter - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| { - Some((c + 1) % count) - }) - .unwrap(); - Some(partitions[current % count]) - } else { - let current = counter.load(Ordering::Relaxed); - Some(partitions[current % count]) - }; - } - - // Slow path: skip revoked partitions via linear scan - let effective_count = member.partitions.len() - member.pending_revocations.len(); - if effective_count == 0 { - return None; - } - - let counter = &member.partition_index; - let idx = if calculate { - counter - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| { - Some((c + 1) % effective_count) - }) - .unwrap() - % effective_count - } else { - counter.load(Ordering::Relaxed) % effective_count - }; - - // Find the idx-th non-revoked partition - let mut seen = 0; - for &pid in &member.partitions { - let is_revoked = member - .pending_revocations - .iter() - .any(|revocation| revocation.partition_id == pid); - if is_revoked { - continue; - } - if seen == idx { - return Some(pid); - } - seen += 1; - } - - None - } - /// Resolve consumer group partition under a single metadata read guard. pub fn resolve_consumer_group_partition( &self, @@ -523,7 +446,7 @@ impl Metadata { let guard = partition.last_polled_offsets.pin(); match guard.get(&key) { Some(existing) => { - existing.store(offset, Ordering::Relaxed); + existing.store(offset, Ordering::Release); } None => { guard.insert(key, Arc::new(std::sync::atomic::AtomicU64::new(offset))); @@ -532,30 +455,6 @@ impl Metadata { } } - /// Get the last offset polled by a consumer group for a partition. - pub fn get_last_polled_offset( - &self, - stream_id: StreamId, - topic_id: TopicId, - group_id: ConsumerGroupId, - partition_id: PartitionId, - ) -> Option<u64> { - use crate::streaming::polling_consumer::ConsumerGroupId as CgIdNewtype; - - let metadata = self.load(); - metadata - .streams - .get(stream_id) - .and_then(|s| s.topics.get(topic_id)) - .and_then(|t| t.partitions.get(partition_id)) - .and_then(|p| { - let guard = p.last_polled_offsets.pin(); - guard - .get(&CgIdNewtype(group_id)) - .map(|v| v.load(Ordering::Relaxed)) - }) - } - pub fn users_count(&self) -> usize { self.load().users.len() } diff --git a/core/server/src/metadata/writer.rs b/core/server/src/metadata/writer.rs index 3afa38d5f..4712679c8 100644 --- a/core/server/src/metadata/writer.rs +++ b/core/server/src/metadata/writer.rs @@ -302,6 +302,7 @@ impl MetadataWriter { self.publish(); } + #[allow(clippy::too_many_arguments)] pub fn complete_partition_revocation( &mut self, stream_id: StreamId, @@ -310,6 +311,7 @@ impl MetadataWriter { member_slab_id: usize, member_id: usize, partition_id: PartitionId, + timed_out: bool, ) { self.append(MetadataOp::CompletePartitionRevocation { stream_id, @@ -318,6 +320,7 @@ impl MetadataWriter { member_slab_id, member_id, partition_id, + timed_out, }); self.publish(); } diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 7cbbb8368..505eb50f1 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -359,6 +359,7 @@ async fn handle_request( member_slab_id, member_id, partition_id, + timed_out, } => { assert_eq!( shard.id, 0, @@ -372,6 +373,7 @@ async fn handle_request( member_slab_id, member_id, partition_id, + timed_out, ); Ok(ShardResponse::CompletePartitionRevocationResponse) diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index 505bcd776..327ba2b14 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -117,6 +117,7 @@ impl IggyShard { revocation.slab_id, revocation.member_id, revocation.partition_id, + false, ); } diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index e42cffc94..d03d7971b 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -298,7 +298,7 @@ impl IggyShard { let path = format!("{}/{}", dir_path, id); ConsumerOffset::new(ConsumerKind::Consumer, *id as u32, offset, path) }); - entry.offset.store(offset, Ordering::Relaxed); + entry.offset.store(offset, Ordering::Release); } PollingConsumer::ConsumerGroup(cg_id, _) => { let Some(offsets) = self.metadata.get_partition_consumer_group_offsets( @@ -319,7 +319,7 @@ impl IggyShard { let path = format!("{}/{}", dir_path, cg_id.0); ConsumerOffset::new(ConsumerKind::ConsumerGroup, cg_id.0 as u32, offset, path) }); - entry.offset.store(offset, Ordering::Relaxed); + entry.offset.store(offset, Ordering::Release); } } } @@ -449,7 +449,7 @@ impl IggyShard { let partition = topic.partitions.get(partition_id)?; let last_polled = { let guard = partition.last_polled_offsets.pin(); - guard.get(group_id).map(|v| v.load(Ordering::Relaxed)) + guard.get(group_id).map(|v| v.load(Ordering::Acquire)) }; let can_complete = match last_polled { None => true, @@ -457,7 +457,7 @@ impl IggyShard { let guard = partition.consumer_group_offsets.pin(); guard .get(group_id) - .map(|co| co.offset.load(Ordering::Relaxed)) + .map(|co| co.offset.load(Ordering::Acquire)) .is_some_and(|c| c >= polled) } }; @@ -473,6 +473,7 @@ impl IggyShard { member_slab_id: member_id.0, member_id: logical_member_id, partition_id, + timed_out: false, }); let _ = self.send_to_control_plane(request).await; } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 68daaf741..d8ce2d891 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -265,7 +265,7 @@ impl IggyShard { ), ), ); - item.offset.store(offset, Ordering::Relaxed); + item.offset.store(offset, Ordering::Release); (item.offset.load(Ordering::Relaxed), item.path.clone()) } PollingConsumer::ConsumerGroup(consumer_group_id, _) => { @@ -287,7 +287,7 @@ impl IggyShard { ), ), ); - item.offset.store(offset, Ordering::Relaxed); + item.offset.store(offset, Ordering::Release); (item.offset.load(Ordering::Relaxed), item.path.clone()) } } diff --git a/core/server/src/shard/tasks/periodic/revocation_timeout.rs b/core/server/src/shard/tasks/periodic/revocation_timeout.rs index d9b5c7abd..21dc68cae 100644 --- a/core/server/src/shard/tasks/periodic/revocation_timeout.rs +++ b/core/server/src/shard/tasks/periodic/revocation_timeout.rs @@ -96,6 +96,7 @@ async fn check_revocation_timeouts(shard: Rc<IggyShard>) -> Result<(), IggyError member_slab_id, member_id, partition_id, + timed_out: true, }); let _ = shard.send_to_control_plane(request).await; } diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index 7fbaddf37..b533eddf1 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -229,6 +229,7 @@ pub enum ShardRequestPayload { member_slab_id: usize, member_id: usize, partition_id: usize, + timed_out: bool, }, // Control-plane: PAT operations
