This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch collaborative_rebalancing in repository https://gitbox.apache.org/repos/asf/iggy.git
commit cbdd441ece5f4a78706a12862e282560fd77d380 Author: spetz <[email protected]> AuthorDate: Thu Feb 12 23:50:41 2026 +0100 feat(server): cooperative partition reassignment for consumer groups --- Cargo.lock | 2 +- DEPENDENCIES.md | 2 +- core/integration/tests/data_integrity/mod.rs | 1 + .../verify_consumer_group_partition_assignment.rs | 1329 ++++++++++++++++++++ core/server/Cargo.toml | 2 +- core/server/src/metadata/absorb.rs | 56 +- core/server/src/metadata/consumer_group.rs | 216 +++- core/server/src/metadata/consumer_group_member.rs | 8 +- core/server/src/metadata/ops.rs | 12 +- core/server/src/metadata/reader.rs | 116 +- core/server/src/metadata/writer.rs | 20 + core/server/src/shard/handlers.rs | 24 + core/server/src/shard/system/consumer_offsets.rs | 71 +- core/server/src/shard/system/messages.rs | 23 + core/server/src/shard/transmission/frame.rs | 1 + core/server/src/shard/transmission/message.rs | 8 + 16 files changed, 1864 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f16507888..0eab45f5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8311,7 +8311,7 @@ dependencies = [ [[package]] name = "server" -version = "0.6.3-edge.2" +version = "0.6.3-edge.3" dependencies = [ "ahash 0.8.12", "anyhow", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index a9e14e6d4..6a3b6aff0 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -724,7 +724,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT", serde_yaml_ng: 0.10.0, "MIT", serial_test: 3.3.1, "MIT", serial_test_derive: 3.3.1, "MIT", -server: 0.6.3-edge.2, "Apache-2.0", +server: 0.6.3-edge.3, "Apache-2.0", sha1: 0.10.6, "Apache-2.0 OR MIT", sha2: 0.10.9, "Apache-2.0 OR MIT", sha3: 0.10.8, "Apache-2.0 OR MIT", diff --git a/core/integration/tests/data_integrity/mod.rs b/core/integration/tests/data_integrity/mod.rs index 742e3128e..7df802e24 100644 --- a/core/integration/tests/data_integrity/mod.rs +++ b/core/integration/tests/data_integrity/mod.rs @@ -17,4 +17,5 @@ */ mod verify_after_server_restart; +mod verify_consumer_group_partition_assignment; mod verify_user_login_after_restart; diff --git a/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs new file mode 100644 index 000000000..203ba881e --- /dev/null +++ b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs @@ -0,0 +1,1329 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::prelude::*; +use integration::iggy_harness; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::Arc; +use tokio::time::{Duration, sleep}; + +const STREAM_NAME: &str = "cg-partition-test-stream"; +const TOPIC_NAME: &str = "cg-partition-test-topic"; +const CONSUMER_GROUP_NAME: &str = "cg-partition-test-group"; +const PARTITIONS_COUNT: u32 = 3; + +async fn create_stale_tcp_client(server_addr: &str) -> IggyClient { + let config = TcpClientConfig { + server_address: server_addr.to_string(), + heartbeat_interval: IggyDuration::from_str("1h").unwrap(), + nodelay: true, + ..TcpClientConfig::default() + }; + let client = TcpClient::create(Arc::new(config)).unwrap(); + Client::connect(&client).await.unwrap(); + IggyClient::create(ClientWrapper::Tcp(client), None, None) +} + +async fn create_tcp_client(server_addr: &str) -> IggyClient { + let config = TcpClientConfig { + server_address: server_addr.to_string(), + heartbeat_interval: IggyDuration::from_str("500ms").unwrap(), + nodelay: true, + ..TcpClientConfig::default() + }; + let client = TcpClient::create(Arc::new(config)).unwrap(); + Client::connect(&client).await.unwrap(); + IggyClient::create(ClientWrapper::Tcp(client), None, None) +} + +#[iggy_harness(server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_duplicate_partition_assignments_after_stale_client_cleanup( + harness: &TestHarness, +) { + let server_addr = harness.server().raw_tcp_addr().unwrap(); + + let root_client = create_tcp_client(&server_addr).await; + root_client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + + // 1. Create stream, topic with 3 partitions, consumer group + root_client.create_stream(STREAM_NAME).await.unwrap(); + root_client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + root_client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); + + // 2. Send a message to each partition + for partition_id in 0..PARTITIONS_COUNT { + let message = IggyMessage::from_str(&format!("message-partition-{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + // 3. Create 3 "stale" TCP clients (1h heartbeat - server will detect them as stale + // after ~2.4s because they won't send any heartbeat). + let stale_client1 = create_stale_tcp_client(&server_addr).await; + let stale_client2 = create_stale_tcp_client(&server_addr).await; + let stale_client3 = create_stale_tcp_client(&server_addr).await; + + for client in [&stale_client1, &stale_client2, &stale_client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + client + .join_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap(); + } + + // 4. Verify initial state: 3 members, each with 1 unique partition + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3, "Expected 3 members before kill"); + assert_unique_partition_assignments(&cg); + + // 5. Poll a message from stale_client1 WITHOUT committing offset (simulating in-flight work) + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let polled = stale_client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, // manual ack - offset NOT stored + ) + .await + .unwrap(); + assert!( + !polled.messages.is_empty(), + "stale_client1 should have polled at least one message" + ); + + // 6. DO NOT drop stale clients - simulating kill -9 (no TCP FIN). + // We keep them alive in scope but won't use them again. + // The server will detect them as stale after ~2.4s (heartbeat timeout). + + // 7. Wait for the server's heartbeat verifier to evict the stale clients. + // Server heartbeat interval = 2s, threshold = 2s * 1.2 = 2.4s. + // Stale clients' heartbeat interval is 1h so they won't ping. + // But they DID send one initial ping on connect, so we wait for that to expire. + // Give it 5s to be safe. + sleep(Duration::from_secs(5)).await; + + // 8. Verify ghosts have been evicted + let cg = get_consumer_group(&root_client).await; + assert_eq!( + cg.members_count, 0, + "Expected 0 members after heartbeat eviction of stale clients, got {}. Members: {:?}", + cg.members_count, cg.members + ); + + // 9. Now create 3 new clients and join same CG (simulating app restart after kill -9). + let client1 = create_tcp_client(&server_addr).await; + let client2 = create_tcp_client(&server_addr).await; + let client3 = create_tcp_client(&server_addr).await; + + for client in [&client1, &client2, &client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + client + .join_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap(); + } + + // 10. Verify exactly 3 members with unique partition assignments + let cg = get_consumer_group(&root_client).await; + assert_eq!( + cg.members_count, 3, + "Expected 3 members after new clients join, got {}. Members: {:?}", + cg.members_count, cg.members + ); + assert_unique_partition_assignments(&cg); + + // 11. Verify each new client can poll from its assigned partition + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut polled_partitions = HashSet::new(); + + for (i, client) in [&client1, &client2, &client3].iter().enumerate() { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + assert!( + !polled.messages.is_empty(), + "client{} should have messages but got none", + i + 1 + ); + assert!( + polled_partitions.insert(polled.partition_id), + "client{} got partition {} which was already assigned to another client! \ + Duplicate partition assignment detected.", + i + 1, + polled.partition_id + ); + } + + assert_eq!( + polled_partitions.len(), + PARTITIONS_COUNT as usize, + "Expected each client to poll from a unique partition" + ); + + // Cleanup + drop(stale_client1); + drop(stale_client2); + drop(stale_client3); + + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_reshuffle_partitions_when_new_member_joins(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + // 1. Create stream, topic with 3 partitions, consumer group + setup_stream_topic_cg(&root_client).await; + + // 2. Create 2 clients, join CG → each gets partitions via incremental assign + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + + for client in [&client1, &client2] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(client).await; + } + + // 3. Record the current partition assignments + let cg_before = get_consumer_group(&root_client).await; + assert_eq!(cg_before.members_count, 2); + + let assignments_before: Vec<(u32, Vec<u32>)> = cg_before + .members + .iter() + .map(|m| (m.id, m.partitions.clone())) + .collect(); + + // 4. A 3rd client joins + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + // 5. Verify: members that had exactly 1 partition still have it (stable assignment). + // Members that were over-assigned may have given up excess partitions - that's expected. + let cg_after = get_consumer_group(&root_client).await; + assert_eq!(cg_after.members_count, 3); + + for (old_id, old_partitions) in &assignments_before { + let member = cg_after.members.iter().find(|m| m.id == *old_id).unwrap(); + // A member with 1 partition must keep it (no reshuffling mid-processing) + if old_partitions.len() == 1 { + assert_eq!( + &member.partitions, old_partitions, + "Member {old_id} had exactly 1 partition {old_partitions:?} but it changed to {:?}. \ + Single-partition assignments must be stable when new members join!", + member.partitions + ); + } + // A member with multiple partitions may give up excess, but must keep at least 1 + if !old_partitions.is_empty() { + assert!( + !member.partitions.is_empty(), + "Member {old_id} had partitions {old_partitions:?} but lost all of them. \ + Existing members must keep at least their fair share." + ); + } + } + + // 6. Verify all partitions are assigned uniquely + assert_unique_partition_assignments(&cg_after); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_duplicate_partition_assignments_after_client_reconnect(harness: &TestHarness) { + let root_client = harness + .root_client() + .await + .expect("Failed to get root client"); + + // 1. Create stream, topic with 3 partitions, consumer group + root_client.create_stream(STREAM_NAME).await.unwrap(); + root_client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + root_client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); + + // 2. Send a message to each partition + for partition_id in 0..PARTITIONS_COUNT { + let message = IggyMessage::from_str(&format!("message-partition-{partition_id}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + + // 3. Create 3 clients, login, join CG + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + let client3 = harness.new_client().await.unwrap(); + + for client in [&client1, &client2, &client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + client + .join_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap(); + } + + // 4. Verify initial state + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3, "Expected 3 members before disconnect"); + assert_unique_partition_assignments(&cg); + + // 5. Drop all 3 clients (graceful disconnect - TCP FIN sent) + drop(client1); + drop(client2); + drop(client3); + + // 6. Wait for server to process the disconnects + sleep(Duration::from_millis(500)).await; + + // 7. Create 3 new clients, join same CG + let new_client1 = harness.new_client().await.unwrap(); + let new_client2 = harness.new_client().await.unwrap(); + let new_client3 = harness.new_client().await.unwrap(); + + for client in [&new_client1, &new_client2, &new_client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + client + .join_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap(); + } + + // 8. Verify exactly 3 members with unique partition assignments + let cg = get_consumer_group(&root_client).await; + assert_eq!( + cg.members_count, 3, + "Expected exactly 3 members after reconnect, got {}. Members: {:?}", + cg.members_count, cg.members + ); + assert_unique_partition_assignments(&cg); + + // 9. Verify polling works correctly + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut messages_per_client = Vec::new(); + + for client in [&new_client1, &new_client2, &new_client3] { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + messages_per_client.push(polled.messages.len()); + } + + let total_messages: usize = messages_per_client.iter().sum(); + assert_eq!( + total_messages, PARTITIONS_COUNT as usize, + "Expected {PARTITIONS_COUNT} total messages across all clients, got {total_messages}. \ + Messages per client: {:?}", + messages_per_client + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +async fn get_consumer_group(client: &IggyClient) -> ConsumerGroupDetails { + client + .get_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap() + .expect("Failed to get consumer group") +} + +fn assert_unique_partition_assignments(cg: &ConsumerGroupDetails) { + let mut all_partitions = HashSet::new(); + let mut total_assigned = 0u32; + + for member in &cg.members { + for &partition in &member.partitions { + total_assigned += 1; + assert!( + all_partitions.insert(partition), + "Partition {partition} is assigned to multiple members! \ + This means two consumers will process the same messages. \ + Consumer group members: {:?}", + cg.members + ); + } + } + + assert_eq!( + total_assigned, PARTITIONS_COUNT, + "Expected {PARTITIONS_COUNT} total partition assignments, got {total_assigned}. \ + Members: {:?}", + cg.members + ); +} + +async fn setup_stream_topic_cg(client: &IggyClient) { + client.create_stream(STREAM_NAME).await.unwrap(); + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); +} + +async fn send_one_message_per_partition(client: &IggyClient) { + for partition_id in 0..PARTITIONS_COUNT { + let message = IggyMessage::from_str(&format!("message-partition-{partition_id}")).unwrap(); + let mut messages = vec![message]; + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } +} + +async fn join_cg(client: &IggyClient) { + client + .join_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_return_same_message_to_two_consumers_during_rebalance(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Consumer1 joins, gets all 3 partitions + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 1); + assert_eq!(cg.members[0].partitions_count, PARTITIONS_COUNT); + + // 2. Consumer1 polls a message WITHOUT committing (simulating in-flight processing) + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let polled1 = client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, // manual commit - offset NOT stored + ) + .await + .unwrap(); + assert_eq!(polled1.messages.len(), 1, "Consumer1 should get a message"); + let message_from_client1 = &polled1.messages[0]; + let partition_polled_by_client1 = polled1.partition_id; + + // 3. Consumer2 joins while consumer1 has in-flight work + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + // 4. Consumer2 polls - it must NOT get the same partition as consumer1's + // in-flight message. Due to cooperative rebalance, the partition with + // in-flight work stays with consumer1 until committed. + let polled2 = client2 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + + if !polled2.messages.is_empty() { + assert_ne!( + polled2.partition_id, + partition_polled_by_client1, + "Consumer2 got partition {} which consumer1 is still processing! \ + Message ID from consumer1: offset={}, partition={}. \ + Message ID from consumer2: offset={}, partition={}. \ + This is the duplicate processing bug!", + polled2.partition_id, + message_from_client1.header.offset, + partition_polled_by_client1, + polled2.messages[0].header.offset, + polled2.partition_id, + ); + } + + // 5. Now consumer1 commits the offset + client1 + .store_consumer_offset( + &consumer, + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(partition_polled_by_client1), + message_from_client1.header.offset, + ) + .await + .unwrap(); + + // 6. Give the server a moment to process the revocation completion + sleep(Duration::from_millis(100)).await; + + // 7. After commit, the partition should have transferred. Verify final state. + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_complete_revocation_on_auto_commit(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Consumer1 joins, polls with auto_commit=true + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + + // Poll all 3 partitions with auto-commit + for _ in 0..PARTITIONS_COUNT { + client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, // auto-commit + ) + .await + .unwrap(); + } + + // 2. Consumer2 joins - since consumer1 already committed all offsets, + // the revocations should complete immediately. + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + // Small delay for revocation processing + sleep(Duration::from_millis(100)).await; + + // 3. Verify partitions are distributed + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + + // Both members should have partitions (not all stuck on consumer1) + for member in &cg.members { + assert!( + !member.partitions.is_empty(), + "Member {} has no partitions - revocation may not have completed. Members: {:?}", + member.id, + cg.members + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_transfer_never_polled_partitions_immediately(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + // NOTE: no messages sent - all partitions are empty + + // 1. Consumer1 joins, gets all 3 partitions + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + // 2. Consumer2 joins - partitions should be distributed immediately + // because consumer1 never polled anything + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + // 3. Consumer3 joins + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + // 4. All 3 members should have exactly 1 partition each - immediately, + // no waiting for commits + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 3); + assert_unique_partition_assignments(&cg); + + for member in &cg.members { + assert_eq!( + member.partitions_count, 1, + "Each member should have exactly 1 partition. Member {} has {}. Members: {:?}", + member.id, member.partitions_count, cg.members + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_rebalance_when_member_with_pending_revocation_leaves(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Consumer1 joins, polls WITHOUT commit (creates in-flight work) + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let _polled = client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + + // 2. Consumer2 joins - some partitions become pending revocation + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + // 3. Consumer1 disconnects (graceful) - should trigger full rebalance + // clearing all pending revocations + drop(client1); + sleep(Duration::from_millis(500)).await; + + // 4. Consumer2 should now get ALL partitions via full rebalance + let cg = get_consumer_group(&root_client).await; + assert_eq!( + cg.members_count, 1, + "Only consumer2 should remain. Members: {:?}", + cg.members + ); + assert_eq!( + cg.members[0].partitions_count, PARTITIONS_COUNT, + "Consumer2 should have all partitions after consumer1 left. Members: {:?}", + cg.members + ); + + // 5. Consumer2 can actually poll from all partitions + let mut polled_partitions = HashSet::new(); + for _ in 0..PARTITIONS_COUNT { + let polled = client2 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + polled_partitions.insert(polled.partition_id); + } + } + assert_eq!( + polled_partitions.len(), + PARTITIONS_COUNT as usize, + "Consumer2 should be able to poll from all {PARTITIONS_COUNT} partitions" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_not_produce_duplicate_messages_with_sequential_consumer_joins( + harness: &TestHarness, +) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + + // Send 5 messages to each partition + for partition_id in 0..PARTITIONS_COUNT { + for msg_idx in 0..5u32 { + let message = IggyMessage::from_str(&format!("p{partition_id}-msg{msg_idx}")).unwrap(); + let mut messages = vec![message]; + root_client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + .unwrap(); + } + } + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + + // 1. Consumer1 joins, gets all 3 partitions + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + // 2. Consumer1 polls 1 message with auto-commit (like the production app does) + let polled1 = client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, // auto-commit like production + ) + .await + .unwrap(); + assert_eq!(polled1.messages.len(), 1); + let first_partition = polled1.partition_id; + let first_offset = polled1.messages[0].header.offset; + + // 3. Consumer2 joins ~50ms later (simulating production timing) + sleep(Duration::from_millis(50)).await; + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + // 4. Consumer3 joins ~50ms later + sleep(Duration::from_millis(50)).await; + let client3 = harness.new_client().await.unwrap(); + client3 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client3).await; + + // 5. Small delay for revocation completions + sleep(Duration::from_millis(200)).await; + + // 6. All 3 consumers poll messages - collect ALL messages across all consumers + let mut all_messages: Vec<(u32, u64)> = Vec::new(); // (partition_id, offset) + + // Already got one from consumer1 + all_messages.push((first_partition, first_offset)); + + // Each consumer polls multiple times to drain their partitions + for client in [&client1, &client2, &client3] { + for _ in 0..20 { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await + .unwrap(); + if polled.messages.is_empty() { + break; + } + for msg in &polled.messages { + all_messages.push((polled.partition_id, msg.header.offset)); + } + } + } + + // 7. Check for duplicates: same (partition_id, offset) must never appear twice + let mut seen = HashSet::new(); + for (partition_id, offset) in &all_messages { + assert!( + seen.insert((*partition_id, *offset)), + "DUPLICATE MESSAGE DETECTED! partition={partition_id}, offset={offset}. \ + This is the exact production bug - two consumers processed the same message. \ + All messages: {all_messages:?}" + ); + } + + // 8. Verify we got all 15 messages (5 per partition x 3 partitions) + assert_eq!( + all_messages.len(), + 15, + "Expected 15 total messages (5 per partition x 3 partitions), got {}. \ + Messages: {:?}", + all_messages.len(), + all_messages + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_wait_for_manual_commit_before_completing_revocation(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Client1 joins, rapidly polls all 3 partitions WITHOUT committing + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut polled_offsets: Vec<(u32, u64)> = Vec::new(); + + for _ in 0..PARTITIONS_COUNT { + let polled = client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, // manual commit + ) + .await + .unwrap(); + assert_eq!(polled.messages.len(), 1); + polled_offsets.push((polled.partition_id, polled.messages[0].header.offset)); + } + + // 2. Consumer2 joins - all partitions have in-flight work + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + sleep(Duration::from_millis(100)).await; + + // 3. Client2 should get NOTHING - all partitions have pending revocations + // that can't complete because client1 hasn't committed + let polled2 = client2 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + assert!( + polled2.messages.is_empty(), + "Client2 should get NO messages while client1 has uncommitted in-flight work \ + on all partitions, but got {} messages from partition {}", + polled2.messages.len(), + polled2.partition_id + ); + + // 4. Client1 commits each partition one by one + for (partition_id, offset) in &polled_offsets { + client1 + .store_consumer_offset( + &consumer, + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(*partition_id), + *offset, + ) + .await + .unwrap(); + } + + sleep(Duration::from_millis(200)).await; + + // 5. Now consumer2 should have partitions + let cg = get_consumer_group(&root_client).await; + assert_eq!(cg.members_count, 2); + assert_unique_partition_assignments(&cg); + + for member in &cg.members { + assert!( + !member.partitions.is_empty(), + "Member {} has no partitions after client1 committed. Members: {:?}", + member.id, + cg.members + ); + } + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_redistribute_when_revocation_target_leaves(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // 1. Consumer1 joins, polls without commit (creates in-flight work) + let client1 = harness.new_client().await.unwrap(); + client1 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client1).await; + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let _polled = client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + false, + ) + .await + .unwrap(); + + // 2. Consumer2 joins - gets some partitions, some become pending revocation + let client2 = harness.new_client().await.unwrap(); + client2 + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + join_cg(&client2).await; + + sleep(Duration::from_millis(100)).await; + + // 3. Consumer2 (the revocation TARGET) disconnects before revocation completes + drop(client2); + sleep(Duration::from_millis(500)).await; + + // 4. Consumer1 should now have ALL partitions back via full rebalance + let cg = get_consumer_group(&root_client).await; + assert_eq!( + cg.members_count, 1, + "Only consumer1 should remain. Members: {:?}", + cg.members + ); + assert_eq!( + cg.members[0].partitions_count, PARTITIONS_COUNT, + "Consumer1 should have all {} partitions after target left. Members: {:?}", + PARTITIONS_COUNT, cg.members + ); + + // 5. Consumer1 can poll from all partitions + let mut polled_partitions = HashSet::new(); + for _ in 0..PARTITIONS_COUNT { + let polled = client1 + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + if !polled.messages.is_empty() { + polled_partitions.insert(polled.partition_id); + } + } + assert_eq!( + polled_partitions.len(), + PARTITIONS_COUNT as usize, + "Consumer1 should poll from all partitions after target left" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server( + heartbeat.enabled = true, + heartbeat.interval = "2s", + tcp.socket.override_defaults = true, + tcp.socket.nodelay = true +))] +async fn should_distribute_partitions_evenly_with_concurrent_joins(harness: &TestHarness) { + let root_client = harness.root_client().await.unwrap(); + + setup_stream_topic_cg(&root_client).await; + send_one_message_per_partition(&root_client).await; + + // Create 3 clients and login + let client1 = harness.new_client().await.unwrap(); + let client2 = harness.new_client().await.unwrap(); + let client3 = harness.new_client().await.unwrap(); + + for client in [&client1, &client2, &client3] { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + } + + // Join all 3 concurrently + let stream_id = Identifier::named(STREAM_NAME).unwrap(); + let topic_id = Identifier::named(TOPIC_NAME).unwrap(); + let group_id = Identifier::named(CONSUMER_GROUP_NAME).unwrap(); + + let (r1, r2, r3) = tokio::join!( + client1.join_consumer_group(&stream_id, &topic_id, &group_id), + client2.join_consumer_group(&stream_id, &topic_id, &group_id), + client3.join_consumer_group(&stream_id, &topic_id, &group_id), + ); + r1.unwrap(); + r2.unwrap(); + r3.unwrap(); + + // Small delay for any pending revocation completions + sleep(Duration::from_millis(200)).await; + + // Verify: 3 members, each with exactly 1 partition + let cg = get_consumer_group(&root_client).await; + assert_eq!( + cg.members_count, 3, + "Expected 3 members after concurrent joins, got {}. Members: {:?}", + cg.members_count, cg.members + ); + assert_unique_partition_assignments(&cg); + + for member in &cg.members { + assert_eq!( + member.partitions_count, 1, + "Each member should have exactly 1 partition with concurrent joins. \ + Member {} has {}. Members: {:?}", + member.id, member.partitions_count, cg.members + ); + } + + // Each consumer polls - must get unique partitions + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + let mut polled_partitions = HashSet::new(); + + for (i, client) in [&client1, &client2, &client3].iter().enumerate() { + let polled = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::offset(0), + 10, + false, + ) + .await + .unwrap(); + assert!( + !polled.messages.is_empty(), + "Client {i} should have messages but got none" + ); + assert!( + polled_partitions.insert(polled.partition_id), + "Client {i} got partition {} already taken by another client! Duplicate!", + polled.partition_id + ); + } + + assert_eq!( + polled_partitions.len(), + 3, + "All 3 partitions must be covered" + ); + + // Cleanup + root_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 995a65ca3..7d5b86867 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "server" -version = "0.6.3-edge.2" +version = "0.6.3-edge.3" edition = "2024" license = "Apache-2.0" diff --git a/core/server/src/metadata/absorb.rs b/core/server/src/metadata/absorb.rs index 50af5b514..f02289e3c 100644 --- a/core/server/src/metadata/absorb.rs +++ b/core/server/src/metadata/absorb.rs @@ -19,17 +19,18 @@ use crate::metadata::ConsumerGroupMemberMeta; use crate::metadata::inner::InnerMetadata; use crate::metadata::ops::MetadataOp; use crate::metadata::{StreamId, UserId}; +use crate::streaming::polling_consumer::ConsumerGroupId; use iggy_common::Permissions; use left_right::Absorb; use std::sync::atomic::Ordering; impl Absorb<MetadataOp> for InnerMetadata { - fn absorb_first(&mut self, op: &mut MetadataOp, _other: &Self) { - apply_op(self, op, true); + fn absorb_first(&mut self, op: &mut MetadataOp, other: &Self) { + apply_op(self, op, true, other); } - fn absorb_second(&mut self, op: MetadataOp, _other: &Self) { - apply_op(self, &op, false); + fn absorb_second(&mut self, op: MetadataOp, other: &Self) { + apply_op(self, &op, false, other); } fn sync_with(&mut self, first: &Self) { @@ -37,7 +38,12 @@ impl Absorb<MetadataOp> for InnerMetadata { } } -fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, populate_ids: bool) { +fn apply_op( + metadata: &mut InnerMetadata, + op: &MetadataOp, + populate_ids: bool, + reader_copy: &InnerMetadata, +) { match op { MetadataOp::Initialize(initial) => { *metadata = (**initial).clone(); @@ -301,7 +307,29 @@ fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, populate_ids: bool) { let new_member = ConsumerGroupMemberMeta::new(next_id, *client_id); group.members.insert(new_member); - group.rebalance_members(); + group.assign_partitions_to_new_members(); + } + + // Complete revocations for partitions with no in-flight work + if let Some(stream) = metadata.streams.get_mut(*stream_id) + && let Some(topic) = stream.topics.get_mut(*topic_id) + { + let cg_id = ConsumerGroupId(*group_id); + let reader_group = reader_copy + .streams + .get(*stream_id) + .and_then(|s| s.topics.get(*topic_id)) + .and_then(|t| t.consumer_groups.get(*group_id)); + let completable = topic + .consumer_groups + .get(*group_id) + .map(|g| g.find_completable_revocations(&topic.partitions, cg_id, reader_group)) + .unwrap_or_default(); + if let Some(group) = topic.consumer_groups.get_mut(*group_id) { + for (member_slab_id, member_id, partition_id) in completable { + group.complete_revocation(member_slab_id, member_id, partition_id); + } + } } } @@ -351,6 +379,22 @@ fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, populate_ids: bool) { } } } + + MetadataOp::CompletePartitionRevocation { + stream_id, + topic_id, + group_id, + member_slab_id, + member_id, + partition_id, + } => { + if let Some(stream) = metadata.streams.get_mut(*stream_id) + && let Some(topic) = stream.topics.get_mut(*topic_id) + && let Some(group) = topic.consumer_groups.get_mut(*group_id) + { + group.complete_revocation(*member_slab_id, *member_id, *partition_id); + } + } } } diff --git a/core/server/src/metadata/consumer_group.rs b/core/server/src/metadata/consumer_group.rs index d44dd5ecb..4c4bff942 100644 --- a/core/server/src/metadata/consumer_group.rs +++ b/core/server/src/metadata/consumer_group.rs @@ -16,9 +16,13 @@ // under the License. use crate::metadata::consumer_group_member::ConsumerGroupMemberMeta; +use crate::metadata::partition::PartitionMeta; use crate::metadata::{ConsumerGroupId, PartitionId}; +use crate::streaming::polling_consumer::ConsumerGroupId as CgId; use slab::Slab; +use std::collections::HashSet; use std::sync::Arc; +use std::sync::atomic::Ordering; #[derive(Clone, Debug)] pub struct ConsumerGroupMeta { @@ -29,7 +33,8 @@ pub struct ConsumerGroupMeta { } impl ConsumerGroupMeta { - /// Rebalance partition assignments among members (round-robin). + /// Full rebalance: clear all assignments and redistribute round-robin. + /// Used when a member leaves or partition count changes. pub fn rebalance_members(&mut self) { let partition_count = self.partitions.len(); let member_count = self.members.len(); @@ -38,11 +43,12 @@ impl ConsumerGroupMeta { return; } - // Clear all member partitions + // Clear all member partitions and pending revocations let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| id).collect(); for &member_id in &member_ids { if let Some(member) = self.members.get_mut(member_id) { member.partitions.clear(); + member.pending_revocations.clear(); } } @@ -56,4 +62,210 @@ impl ConsumerGroupMeta { } } } + + /// Incremental partition assignment for a new member join. + pub fn assign_partitions_to_new_members(&mut self) { + let member_count = self.members.len(); + if member_count == 0 || self.partitions.is_empty() { + return; + } + + // Find which partitions are already assigned + let mut assigned: HashSet<PartitionId> = HashSet::new(); + for (_, member) in self.members.iter() { + for &pid in &member.partitions { + assigned.insert(pid); + } + } + + // Step 1: Assign unassigned partitions to idle members + let unassigned: Vec<PartitionId> = self + .partitions + .iter() + .copied() + .filter(|pid| !assigned.contains(pid)) + .collect(); + + if !unassigned.is_empty() { + let idle_member_ids: Vec<usize> = self + .members + .iter() + .filter(|(_, m)| m.partitions.is_empty()) + .map(|(id, _)| id) + .collect(); + + if !idle_member_ids.is_empty() { + for (i, partition_id) in unassigned.into_iter().enumerate() { + let member_idx = i % idle_member_ids.len(); + if let Some(member) = self.members.get_mut(idle_member_ids[member_idx]) { + member.partitions.push(partition_id); + } + } + } + } + + // Step 2: Mark excess partitions as pending revocation + let partition_count = self.partitions.len(); + let fair_share = partition_count / member_count; + let remainder = partition_count % member_count; + + // Collect members already targeted by a pending revocation + let revocation_targets: HashSet<usize> = self + .members + .iter() + .flat_map(|(_, m)| m.pending_revocations.iter().map(|(_, target)| *target)) + .collect(); + + // Collect idle members (no partitions and not already a revocation target) + let mut idle_slab_ids: Vec<usize> = self + .members + .iter() + .filter(|(id, m)| m.partitions.is_empty() && !revocation_targets.contains(id)) + .map(|(id, _)| id) + .collect(); + + if idle_slab_ids.is_empty() { + return; + } + + // Find over-assigned members and mark excess as pending revocation + let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| id).collect(); + let mut members_with_remainder = remainder; + + for &mid in &member_ids { + if idle_slab_ids.is_empty() { + break; + } + let effective_count = self + .members + .get(mid) + .map(|m| { + let pending: HashSet<PartitionId> = + m.pending_revocations.iter().map(|(pid, _)| *pid).collect(); + m.partitions.iter().filter(|p| !pending.contains(p)).count() + }) + .unwrap_or(0); + + // This member's max allowed partitions + let max_allowed = if members_with_remainder > 0 { + fair_share + 1 + } else { + fair_share + }; + + if effective_count <= max_allowed { + if effective_count > fair_share { + members_with_remainder = members_with_remainder.saturating_sub(1); + } + continue; + } + + // Mark excess partitions as pending revocation (from the END of the list) + let excess_count = effective_count - max_allowed; + if members_with_remainder > 0 && effective_count > fair_share { + members_with_remainder = members_with_remainder.saturating_sub(1); + } + + // Collect partitions eligible for revocation (not already pending) + let revocable: Vec<PartitionId> = self + .members + .get(mid) + .map(|m| { + let pending: HashSet<PartitionId> = + m.pending_revocations.iter().map(|(pid, _)| *pid).collect(); + m.partitions + .iter() + .rev() + .filter(|p| !pending.contains(p)) + .copied() + .collect() + }) + .unwrap_or_default(); + + for partition_id in revocable.into_iter().take(excess_count) { + if idle_slab_ids.is_empty() { + break; + } + let idle_id = idle_slab_ids.remove(0); + if let Some(member) = self.members.get_mut(mid) { + member.pending_revocations.push((partition_id, idle_id)); + } + } + } + } + + /// Find revocations completable immediately (never polled or already committed). + pub fn find_completable_revocations( + &self, + partitions: &[PartitionMeta], + cg_id: CgId, + reader_group: Option<&ConsumerGroupMeta>, + ) -> Vec<(usize, usize, PartitionId)> { + let mut result = Vec::new(); + for (slab_id, member) in self.members.iter() { + // Read last_polled_offsets from the reader copy (where data shards write) + let reader_offsets = reader_group + .and_then(|rg| rg.members.get(slab_id)) + .map(|rm| &rm.last_polled_offsets); + + for &(pid, _) in &member.pending_revocations { + let last_polled = reader_offsets.and_then(|offsets| { + let guard = offsets.pin(); + guard.get(&pid).map(|v| v.load(Ordering::Relaxed)) + }); + let can_complete = match last_polled { + None => true, + Some(polled) => partitions + .get(pid) + .and_then(|p| { + let g = p.consumer_group_offsets.pin(); + g.get(&cg_id).map(|co| co.offset.load(Ordering::Relaxed)) + }) + .is_some_and(|committed| committed >= polled), + }; + if can_complete { + result.push((slab_id, member.id, pid)); + } + } + } + result + } + + /// Complete a pending revocation, moving the partition to the target member. + pub fn complete_revocation( + &mut self, + member_slab_id: usize, + member_id: usize, + partition_id: PartitionId, + ) -> bool { + let target_slab_id = if let Some(member) = self.members.get_mut(member_slab_id) { + if member.id != member_id { + return false; + } + let pos = member + .pending_revocations + .iter() + .position(|(pid, _)| *pid == partition_id); + if let Some(pos) = pos { + let (_, target) = member.pending_revocations.remove(pos); + member.partitions.retain(|&p| p != partition_id); + Some(target) + } else { + None + } + } else { + None + }; + + if let Some(target) = target_slab_id { + if let Some(target_member) = self.members.get_mut(target) { + target_member.partitions.push(partition_id); + return true; + } + self.rebalance_members(); + return true; + } + + false + } } diff --git a/core/server/src/metadata/consumer_group_member.rs b/core/server/src/metadata/consumer_group_member.rs index 8543351cb..fe78e593b 100644 --- a/core/server/src/metadata/consumer_group_member.rs +++ b/core/server/src/metadata/consumer_group_member.rs @@ -17,7 +17,7 @@ use crate::metadata::{ClientId, ConsumerGroupMemberId, PartitionId}; use std::sync::Arc; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicU64, AtomicUsize}; #[derive(Clone, Debug)] pub struct ConsumerGroupMemberMeta { @@ -25,6 +25,10 @@ pub struct ConsumerGroupMemberMeta { pub client_id: ClientId, pub partitions: Vec<PartitionId>, pub partition_index: Arc<AtomicUsize>, + /// Partitions pending cooperative revocation: (partition_id, target_member_slab_id). + pub pending_revocations: Vec<(PartitionId, usize)>, + /// Last offset returned per partition during poll, for cooperative rebalance. + pub last_polled_offsets: Arc<papaya::HashMap<PartitionId, Arc<AtomicU64>>>, } impl ConsumerGroupMemberMeta { @@ -34,6 +38,8 @@ impl ConsumerGroupMemberMeta { client_id, partitions: Vec::new(), partition_index: Arc::new(AtomicUsize::new(0)), + pending_revocations: Vec::new(), + last_polled_offsets: Arc::new(papaya::HashMap::new()), } } } diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs index 4c0328ee5..ce6d5915a 100644 --- a/core/server/src/metadata/ops.rs +++ b/core/server/src/metadata/ops.rs @@ -17,8 +17,8 @@ use crate::metadata::inner::InnerMetadata; use crate::metadata::{ - ConsumerGroupId, ConsumerGroupMeta, PartitionMeta, StreamId, StreamMeta, TopicId, TopicMeta, - UserId, UserMeta, + ConsumerGroupId, ConsumerGroupMeta, PartitionId, PartitionMeta, StreamId, StreamMeta, TopicId, + TopicMeta, UserId, UserMeta, }; use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize, PersonalAccessToken}; use std::sync::Arc; @@ -119,4 +119,12 @@ pub enum MetadataOp { topic_id: TopicId, partitions_count: u32, }, + CompletePartitionRevocation { + stream_id: StreamId, + topic_id: TopicId, + group_id: ConsumerGroupId, + member_slab_id: usize, + member_id: usize, + partition_id: PartitionId, + }, } diff --git a/core/server/src/metadata/reader.rs b/core/server/src/metadata/reader.rs index d2ff0dd9a..bae83c982 100644 --- a/core/server/src/metadata/reader.rs +++ b/core/server/src/metadata/reader.rs @@ -271,25 +271,117 @@ impl Metadata { .members .get(member_id)?; - let assigned_partitions = &member.partitions; - if assigned_partitions.is_empty() { + // Fast path: no pending revocations + if member.pending_revocations.is_empty() { + let partitions = &member.partitions; + let count = partitions.len(); + if count == 0 { + return None; + } + let counter = &member.partition_index; + return if calculate { + let current = counter + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| { + Some((c + 1) % count) + }) + .unwrap(); + Some(partitions[current % count]) + } else { + let current = counter.load(Ordering::Relaxed); + Some(partitions[current % count]) + }; + } + + // Slow path: skip revoked partitions via linear scan + let effective_count = member.partitions.len() - member.pending_revocations.len(); + if effective_count == 0 { return None; } - let partitions_count = assigned_partitions.len(); let counter = &member.partition_index; - - if calculate { - let current = counter - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - Some((current + 1) % partitions_count) + let idx = if calculate { + counter + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| { + Some((c + 1) % effective_count) }) - .unwrap(); - Some(assigned_partitions[current % partitions_count]) + .unwrap() + % effective_count } else { - let current = counter.load(Ordering::Relaxed); - Some(assigned_partitions[current % partitions_count]) + counter.load(Ordering::Relaxed) % effective_count + }; + + // Find the idx-th non-revoked partition + let mut seen = 0; + for &pid in &member.partitions { + let is_revoked = member + .pending_revocations + .iter() + .any(|(rpid, _)| *rpid == pid); + if is_revoked { + continue; + } + if seen == idx { + return Some(pid); + } + seen += 1; } + + None + } + + /// Record the last offset returned to a CG member during poll. + pub fn record_polled_offset( + &self, + stream_id: StreamId, + topic_id: TopicId, + group_id: ConsumerGroupId, + member_id: usize, + partition_id: PartitionId, + offset: u64, + ) { + let metadata = self.load(); + if let Some(member) = metadata + .streams + .get(stream_id) + .and_then(|s| s.topics.get(topic_id)) + .and_then(|t| t.consumer_groups.get(group_id)) + .and_then(|g| g.members.get(member_id)) + { + let guard = member.last_polled_offsets.pin(); + match guard.get(&partition_id) { + Some(existing) => { + existing.store(offset, Ordering::Relaxed); + } + None => { + guard.insert( + partition_id, + Arc::new(std::sync::atomic::AtomicU64::new(offset)), + ); + } + } + } + } + + /// Get the last offset polled by a CG member for a partition. + pub fn get_last_polled_offset( + &self, + stream_id: StreamId, + topic_id: TopicId, + group_id: ConsumerGroupId, + member_id: usize, + partition_id: PartitionId, + ) -> Option<u64> { + let metadata = self.load(); + metadata + .streams + .get(stream_id) + .and_then(|s| s.topics.get(topic_id)) + .and_then(|t| t.consumer_groups.get(group_id)) + .and_then(|g| g.members.get(member_id)) + .and_then(|m| { + let guard = m.last_polled_offsets.pin(); + guard.get(&partition_id).map(|v| v.load(Ordering::Relaxed)) + }) } pub fn users_count(&self) -> usize { diff --git a/core/server/src/metadata/writer.rs b/core/server/src/metadata/writer.rs index 2e3a409e9..3023bd00e 100644 --- a/core/server/src/metadata/writer.rs +++ b/core/server/src/metadata/writer.rs @@ -295,6 +295,26 @@ impl MetadataWriter { self.publish(); } + pub fn complete_partition_revocation( + &mut self, + stream_id: StreamId, + topic_id: TopicId, + group_id: ConsumerGroupId, + member_slab_id: usize, + member_id: usize, + partition_id: PartitionId, + ) { + self.append(MetadataOp::CompletePartitionRevocation { + stream_id, + topic_id, + group_id, + member_slab_id, + member_id, + partition_id, + }); + self.publish(); + } + // High-level registration methods with validation pub fn create_stream( diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index a2e14800c..7cbbb8368 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -352,6 +352,30 @@ async fn handle_request( Ok(ShardResponse::LeaveConsumerGroupMetadataOnlyResponse) } + ShardRequestPayload::CompletePartitionRevocation { + stream_id, + topic_id, + group_id, + member_slab_id, + member_id, + partition_id, + } => { + assert_eq!( + shard.id, 0, + "CompletePartitionRevocation should only be handled by shard0" + ); + + shard.writer().complete_partition_revocation( + stream_id, + topic_id, + group_id, + member_slab_id, + member_id, + partition_id, + ); + + Ok(ShardResponse::CompletePartitionRevocationResponse) + } ShardRequestPayload::SocketTransfer { fd, from_shard, diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index e8f76cde5..717ff1caa 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -19,7 +19,7 @@ use super::COMPONENT; use crate::{ shard::IggyShard, - shard::transmission::message::ResolvedTopic, + shard::transmission::message::{ResolvedTopic, ShardRequest, ShardRequestPayload}, streaming::{ partitions::consumer_offset::ConsumerOffset, polling_consumer::{ConsumerGroupId, PollingConsumer}, @@ -76,6 +76,15 @@ impl IggyShard { partition_id, ) .await?; + + self.maybe_complete_pending_revocation( + &polling_consumer, + topic.stream_id, + topic.topic_id, + partition_id, + ) + .await; + Ok((polling_consumer, partition_id)) } @@ -414,6 +423,66 @@ impl IggyShard { Ok(()) } + /// Complete a pending partition revocation if this offset commit satisfies it. + pub(crate) async fn maybe_complete_pending_revocation( + &self, + polling_consumer: &PollingConsumer, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) { + let PollingConsumer::ConsumerGroup(group_id, member_id) = polling_consumer else { + return; + }; + + let completion_info = self + .metadata + .get_consumer_group(stream_id, topic_id, group_id.0) + .and_then(|cg| { + let member = cg.members.get(member_id.0)?; + if !member + .pending_revocations + .iter() + .any(|(pid, _)| *pid == partition_id) + { + return None; + } + let last_polled = { + let guard = member.last_polled_offsets.pin(); + guard.get(&partition_id).map(|v| v.load(Ordering::Relaxed)) + }; + let can_complete = match last_polled { + None => true, + Some(polled) => { + let committed = self + .metadata + .get_partition_consumer_group_offsets(stream_id, topic_id, partition_id) + .and_then(|offsets| { + let guard = offsets.pin(); + guard + .get(group_id) + .map(|co| co.offset.load(Ordering::Relaxed)) + }); + committed.is_some_and(|c| c >= polled) + } + }; + if can_complete { Some(member.id) } else { None } + }); + + if let Some(logical_member_id) = completion_info { + let request = + ShardRequest::control_plane(ShardRequestPayload::CompletePartitionRevocation { + stream_id, + topic_id, + group_id: group_id.0, + member_slab_id: member_id.0, + member_id: logical_member_id, + partition_id, + }); + let _ = self.send_to_control_plane(request).await; + } + } + async fn delete_all_files_in_dir(dir: &str) -> Result<(), IggyError> { let entries = match std::fs::read_dir(dir) { Ok(entries) => entries, diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index bc07ea457..0f3e82b7f 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -102,6 +102,20 @@ impl IggyShard { batch }; + // Track last offset sent to CG member for cooperative rebalance. + if let PollingConsumer::ConsumerGroup(group_id, member_id) = &consumer + && let Some(last_offset) = batch.last_offset() + { + self.metadata.record_polled_offset( + topic.stream_id, + topic.topic_id, + group_id.0, + member_id.0, + partition_id, + last_offset, + ); + } + Ok((metadata, batch)) } @@ -281,6 +295,15 @@ impl IggyShard { }; crate::streaming::partitions::storage::persist_offset(&path, offset_value).await?; + + self.maybe_complete_pending_revocation( + &consumer, + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ) + .await; + Ok(()) } diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index e6d630837..ab4351011 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -94,6 +94,7 @@ pub enum ShardResponse { CreatePersonalAccessTokenResponse(PersonalAccessToken, String), DeletePersonalAccessTokenResponse, LeaveConsumerGroupMetadataOnlyResponse, + CompletePartitionRevocationResponse, PurgeStreamResponse, PurgeTopicResponse, ErrorResponse(IggyError), diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index 6fe952b92..7fbaddf37 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -222,6 +222,14 @@ pub enum ShardRequestPayload { group_id: usize, client_id: u32, }, + CompletePartitionRevocation { + stream_id: usize, + topic_id: usize, + group_id: usize, + member_slab_id: usize, + member_id: usize, + partition_id: usize, + }, // Control-plane: PAT operations CreatePersonalAccessTokenRequest {
