This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch collaborative_rebalancing
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit cbdd441ece5f4a78706a12862e282560fd77d380
Author: spetz <[email protected]>
AuthorDate: Thu Feb 12 23:50:41 2026 +0100

    feat(server): cooperative partition reassignment for consumer groups
---
 Cargo.lock                                         |    2 +-
 DEPENDENCIES.md                                    |    2 +-
 core/integration/tests/data_integrity/mod.rs       |    1 +
 .../verify_consumer_group_partition_assignment.rs  | 1329 ++++++++++++++++++++
 core/server/Cargo.toml                             |    2 +-
 core/server/src/metadata/absorb.rs                 |   56 +-
 core/server/src/metadata/consumer_group.rs         |  216 +++-
 core/server/src/metadata/consumer_group_member.rs  |    8 +-
 core/server/src/metadata/ops.rs                    |   12 +-
 core/server/src/metadata/reader.rs                 |  116 +-
 core/server/src/metadata/writer.rs                 |   20 +
 core/server/src/shard/handlers.rs                  |   24 +
 core/server/src/shard/system/consumer_offsets.rs   |   71 +-
 core/server/src/shard/system/messages.rs           |   23 +
 core/server/src/shard/transmission/frame.rs        |    1 +
 core/server/src/shard/transmission/message.rs      |    8 +
 16 files changed, 1864 insertions(+), 27 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f16507888..0eab45f5d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8311,7 +8311,7 @@ dependencies = [
 
 [[package]]
 name = "server"
-version = "0.6.3-edge.2"
+version = "0.6.3-edge.3"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index a9e14e6d4..6a3b6aff0 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -724,7 +724,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
 serde_yaml_ng: 0.10.0, "MIT",
 serial_test: 3.3.1, "MIT",
 serial_test_derive: 3.3.1, "MIT",
-server: 0.6.3-edge.2, "Apache-2.0",
+server: 0.6.3-edge.3, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
 sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/integration/tests/data_integrity/mod.rs 
b/core/integration/tests/data_integrity/mod.rs
index 742e3128e..7df802e24 100644
--- a/core/integration/tests/data_integrity/mod.rs
+++ b/core/integration/tests/data_integrity/mod.rs
@@ -17,4 +17,5 @@
  */
 
 mod verify_after_server_restart;
+mod verify_consumer_group_partition_assignment;
 mod verify_user_login_after_restart;
diff --git 
a/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs
 
b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs
new file mode 100644
index 000000000..203ba881e
--- /dev/null
+++ 
b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs
@@ -0,0 +1,1329 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy::prelude::*;
+use integration::iggy_harness;
+use std::collections::HashSet;
+use std::str::FromStr;
+use std::sync::Arc;
+use tokio::time::{Duration, sleep};
+
+const STREAM_NAME: &str = "cg-partition-test-stream";
+const TOPIC_NAME: &str = "cg-partition-test-topic";
+const CONSUMER_GROUP_NAME: &str = "cg-partition-test-group";
+const PARTITIONS_COUNT: u32 = 3;
+
+async fn create_stale_tcp_client(server_addr: &str) -> IggyClient {
+    let config = TcpClientConfig {
+        server_address: server_addr.to_string(),
+        heartbeat_interval: IggyDuration::from_str("1h").unwrap(),
+        nodelay: true,
+        ..TcpClientConfig::default()
+    };
+    let client = TcpClient::create(Arc::new(config)).unwrap();
+    Client::connect(&client).await.unwrap();
+    IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+async fn create_tcp_client(server_addr: &str) -> IggyClient {
+    let config = TcpClientConfig {
+        server_address: server_addr.to_string(),
+        heartbeat_interval: IggyDuration::from_str("500ms").unwrap(),
+        nodelay: true,
+        ..TcpClientConfig::default()
+    };
+    let client = TcpClient::create(Arc::new(config)).unwrap();
+    Client::connect(&client).await.unwrap();
+    IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+#[iggy_harness(server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_not_duplicate_partition_assignments_after_stale_client_cleanup(
+    harness: &TestHarness,
+) {
+    let server_addr = harness.server().raw_tcp_addr().unwrap();
+
+    let root_client = create_tcp_client(&server_addr).await;
+    root_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+
+    // 1. Create stream, topic with 3 partitions, consumer group
+    root_client.create_stream(STREAM_NAME).await.unwrap();
+    root_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    root_client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    // 2. Send a message to each partition
+    for partition_id in 0..PARTITIONS_COUNT {
+        let message = 
IggyMessage::from_str(&format!("message-partition-{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 3. Create 3 "stale" TCP clients (1h heartbeat - server will detect them 
as stale
+    //    after ~2.4s because they won't send any heartbeat).
+    let stale_client1 = create_stale_tcp_client(&server_addr).await;
+    let stale_client2 = create_stale_tcp_client(&server_addr).await;
+    let stale_client3 = create_stale_tcp_client(&server_addr).await;
+
+    for client in [&stale_client1, &stale_client2, &stale_client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        client
+            .join_consumer_group(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+            )
+            .await
+            .unwrap();
+    }
+
+    // 4. Verify initial state: 3 members, each with 1 unique partition
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3, "Expected 3 members before kill");
+    assert_unique_partition_assignments(&cg);
+
+    // 5. Poll a message from stale_client1 WITHOUT committing offset 
(simulating in-flight work)
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let polled = stale_client1
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            None,
+            &consumer,
+            &PollingStrategy::next(),
+            1,
+            false, // manual ack - offset NOT stored
+        )
+        .await
+        .unwrap();
+    assert!(
+        !polled.messages.is_empty(),
+        "stale_client1 should have polled at least one message"
+    );
+
+    // 6. DO NOT drop stale clients - simulating kill -9 (no TCP FIN).
+    //    We keep them alive in scope but won't use them again.
+    //    The server will detect them as stale after ~2.4s (heartbeat timeout).
+
+    // 7. Wait for the server's heartbeat verifier to evict the stale clients.
+    //    Server heartbeat interval = 2s, threshold = 2s * 1.2 = 2.4s.
+    //    Stale clients' heartbeat interval is 1h so they won't ping.
+    //    But they DID send one initial ping on connect, so we wait for that 
to expire.
+    //    Give it 5s to be safe.
+    sleep(Duration::from_secs(5)).await;
+
+    // 8. Verify ghosts have been evicted
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(
+        cg.members_count, 0,
+        "Expected 0 members after heartbeat eviction of stale clients, got {}. 
Members: {:?}",
+        cg.members_count, cg.members
+    );
+
+    // 9. Now create 3 new clients and join same CG (simulating app restart 
after kill -9).
+    let client1 = create_tcp_client(&server_addr).await;
+    let client2 = create_tcp_client(&server_addr).await;
+    let client3 = create_tcp_client(&server_addr).await;
+
+    for client in [&client1, &client2, &client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        client
+            .join_consumer_group(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+            )
+            .await
+            .unwrap();
+    }
+
+    // 10. Verify exactly 3 members with unique partition assignments
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(
+        cg.members_count, 3,
+        "Expected 3 members after new clients join, got {}. Members: {:?}",
+        cg.members_count, cg.members
+    );
+    assert_unique_partition_assignments(&cg);
+
+    // 11. Verify each new client can poll from its assigned partition
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut polled_partitions = HashSet::new();
+
+    for (i, client) in [&client1, &client2, &client3].iter().enumerate() {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                10,
+                false,
+            )
+            .await
+            .unwrap();
+        assert!(
+            !polled.messages.is_empty(),
+            "client{} should have messages but got none",
+            i + 1
+        );
+        assert!(
+            polled_partitions.insert(polled.partition_id),
+            "client{} got partition {} which was already assigned to another 
client! \
+             Duplicate partition assignment detected.",
+            i + 1,
+            polled.partition_id
+        );
+    }
+
+    assert_eq!(
+        polled_partitions.len(),
+        PARTITIONS_COUNT as usize,
+        "Expected each client to poll from a unique partition"
+    );
+
+    // Cleanup
+    drop(stale_client1);
+    drop(stale_client2);
+    drop(stale_client3);
+
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_not_reshuffle_partitions_when_new_member_joins(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    // 1. Create stream, topic with 3 partitions, consumer group
+    setup_stream_topic_cg(&root_client).await;
+
+    // 2. Create 2 clients, join CG → each gets partitions via incremental 
assign
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+
+    for client in [&client1, &client2] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    // 3. Record the current partition assignments
+    let cg_before = get_consumer_group(&root_client).await;
+    assert_eq!(cg_before.members_count, 2);
+
+    let assignments_before: Vec<(u32, Vec<u32>)> = cg_before
+        .members
+        .iter()
+        .map(|m| (m.id, m.partitions.clone()))
+        .collect();
+
+    // 4. A 3rd client joins
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    // 5. Verify: members that had exactly 1 partition still have it (stable 
assignment).
+    //    Members that were over-assigned may have given up excess partitions 
- that's expected.
+    let cg_after = get_consumer_group(&root_client).await;
+    assert_eq!(cg_after.members_count, 3);
+
+    for (old_id, old_partitions) in &assignments_before {
+        let member = cg_after.members.iter().find(|m| m.id == 
*old_id).unwrap();
+        // A member with 1 partition must keep it (no reshuffling 
mid-processing)
+        if old_partitions.len() == 1 {
+            assert_eq!(
+                &member.partitions, old_partitions,
+                "Member {old_id} had exactly 1 partition {old_partitions:?} 
but it changed to {:?}. \
+                 Single-partition assignments must be stable when new members 
join!",
+                member.partitions
+            );
+        }
+        // A member with multiple partitions may give up excess, but must keep 
at least 1
+        if !old_partitions.is_empty() {
+            assert!(
+                !member.partitions.is_empty(),
+                "Member {old_id} had partitions {old_partitions:?} but lost 
all of them. \
+                 Existing members must keep at least their fair share."
+            );
+        }
+    }
+
+    // 6. Verify all partitions are assigned uniquely
+    assert_unique_partition_assignments(&cg_after);
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn 
should_not_duplicate_partition_assignments_after_client_reconnect(harness: 
&TestHarness) {
+    let root_client = harness
+        .root_client()
+        .await
+        .expect("Failed to get root client");
+
+    // 1. Create stream, topic with 3 partitions, consumer group
+    root_client.create_stream(STREAM_NAME).await.unwrap();
+    root_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    root_client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    // 2. Send a message to each partition
+    for partition_id in 0..PARTITIONS_COUNT {
+        let message = 
IggyMessage::from_str(&format!("message-partition-{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 3. Create 3 clients, login, join CG
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    let client3 = harness.new_client().await.unwrap();
+
+    for client in [&client1, &client2, &client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        client
+            .join_consumer_group(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+            )
+            .await
+            .unwrap();
+    }
+
+    // 4. Verify initial state
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3, "Expected 3 members before disconnect");
+    assert_unique_partition_assignments(&cg);
+
+    // 5. Drop all 3 clients (graceful disconnect - TCP FIN sent)
+    drop(client1);
+    drop(client2);
+    drop(client3);
+
+    // 6. Wait for server to process the disconnects
+    sleep(Duration::from_millis(500)).await;
+
+    // 7. Create 3 new clients, join same CG
+    let new_client1 = harness.new_client().await.unwrap();
+    let new_client2 = harness.new_client().await.unwrap();
+    let new_client3 = harness.new_client().await.unwrap();
+
+    for client in [&new_client1, &new_client2, &new_client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        client
+            .join_consumer_group(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+            )
+            .await
+            .unwrap();
+    }
+
+    // 8. Verify exactly 3 members with unique partition assignments
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(
+        cg.members_count, 3,
+        "Expected exactly 3 members after reconnect, got {}. Members: {:?}",
+        cg.members_count, cg.members
+    );
+    assert_unique_partition_assignments(&cg);
+
+    // 9. Verify polling works correctly
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut messages_per_client = Vec::new();
+
+    for client in [&new_client1, &new_client2, &new_client3] {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                10,
+                false,
+            )
+            .await
+            .unwrap();
+        messages_per_client.push(polled.messages.len());
+    }
+
+    let total_messages: usize = messages_per_client.iter().sum();
+    assert_eq!(
+        total_messages, PARTITIONS_COUNT as usize,
+        "Expected {PARTITIONS_COUNT} total messages across all clients, got 
{total_messages}. \
+         Messages per client: {:?}",
+        messages_per_client
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+async fn get_consumer_group(client: &IggyClient) -> ConsumerGroupDetails {
+    client
+        .get_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap()
+        .expect("Failed to get consumer group")
+}
+
+fn assert_unique_partition_assignments(cg: &ConsumerGroupDetails) {
+    let mut all_partitions = HashSet::new();
+    let mut total_assigned = 0u32;
+
+    for member in &cg.members {
+        for &partition in &member.partitions {
+            total_assigned += 1;
+            assert!(
+                all_partitions.insert(partition),
+                "Partition {partition} is assigned to multiple members! \
+                 This means two consumers will process the same messages. \
+                 Consumer group members: {:?}",
+                cg.members
+            );
+        }
+    }
+
+    assert_eq!(
+        total_assigned, PARTITIONS_COUNT,
+        "Expected {PARTITIONS_COUNT} total partition assignments, got 
{total_assigned}. \
+         Members: {:?}",
+        cg.members
+    );
+}
+
+async fn setup_stream_topic_cg(client: &IggyClient) {
+    client.create_stream(STREAM_NAME).await.unwrap();
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+}
+
+async fn send_one_message_per_partition(client: &IggyClient) {
+    for partition_id in 0..PARTITIONS_COUNT {
+        let message = 
IggyMessage::from_str(&format!("message-partition-{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+}
+
+async fn join_cg(client: &IggyClient) {
+    client
+        .join_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn 
should_not_return_same_message_to_two_consumers_during_rebalance(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Consumer1 joins, gets all 3 partitions
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 1);
+    assert_eq!(cg.members[0].partitions_count, PARTITIONS_COUNT);
+
+    // 2. Consumer1 polls a message WITHOUT committing (simulating in-flight 
processing)
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let polled1 = client1
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            None,
+            &consumer,
+            &PollingStrategy::next(),
+            1,
+            false, // manual commit - offset NOT stored
+        )
+        .await
+        .unwrap();
+    assert_eq!(polled1.messages.len(), 1, "Consumer1 should get a message");
+    let message_from_client1 = &polled1.messages[0];
+    let partition_polled_by_client1 = polled1.partition_id;
+
+    // 3. Consumer2 joins while consumer1 has in-flight work
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    // 4. Consumer2 polls - it must NOT get the same partition as consumer1's
+    //    in-flight message. Due to cooperative rebalance, the partition with
+    //    in-flight work stays with consumer1 until committed.
+    let polled2 = client2
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            None,
+            &consumer,
+            &PollingStrategy::next(),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+
+    if !polled2.messages.is_empty() {
+        assert_ne!(
+            polled2.partition_id,
+            partition_polled_by_client1,
+            "Consumer2 got partition {} which consumer1 is still processing! \
+             Message ID from consumer1: offset={}, partition={}. \
+             Message ID from consumer2: offset={}, partition={}. \
+             This is the duplicate processing bug!",
+            polled2.partition_id,
+            message_from_client1.header.offset,
+            partition_polled_by_client1,
+            polled2.messages[0].header.offset,
+            polled2.partition_id,
+        );
+    }
+
+    // 5. Now consumer1 commits the offset
+    client1
+        .store_consumer_offset(
+            &consumer,
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(partition_polled_by_client1),
+            message_from_client1.header.offset,
+        )
+        .await
+        .unwrap();
+
+    // 6. Give the server a moment to process the revocation completion
+    sleep(Duration::from_millis(100)).await;
+
+    // 7. After commit, the partition should have transferred. Verify final 
state.
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_complete_revocation_on_auto_commit(harness: &TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Consumer1 joins, polls with auto_commit=true
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+
+    // Poll all 3 partitions with auto-commit
+    for _ in 0..PARTITIONS_COUNT {
+        client1
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true, // auto-commit
+            )
+            .await
+            .unwrap();
+    }
+
+    // 2. Consumer2 joins - since consumer1 already committed all offsets,
+    //    the revocations should complete immediately.
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    // Small delay for revocation processing
+    sleep(Duration::from_millis(100)).await;
+
+    // 3. Verify partitions are distributed
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+
+    // Both members should have partitions (not all stuck on consumer1)
+    for member in &cg.members {
+        assert!(
+            !member.partitions.is_empty(),
+            "Member {} has no partitions - revocation may not have completed. 
Members: {:?}",
+            member.id,
+            cg.members
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_transfer_never_polled_partitions_immediately(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    // NOTE: no messages sent - all partitions are empty
+
+    // 1. Consumer1 joins, gets all 3 partitions
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    // 2. Consumer2 joins - partitions should be distributed immediately
+    //    because consumer1 never polled anything
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    // 3. Consumer3 joins
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    // 4. All 3 members should have exactly 1 partition each - immediately,
+    //    no waiting for commits
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 1,
+            "Each member should have exactly 1 partition. Member {} has {}. 
Members: {:?}",
+            member.id, member.partitions_count, cg.members
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_rebalance_when_member_with_pending_revocation_leaves(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Consumer1 joins, polls WITHOUT commit (creates in-flight work)
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let _polled = client1
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            None,
+            &consumer,
+            &PollingStrategy::next(),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+
+    // 2. Consumer2 joins - some partitions become pending revocation
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    // 3. Consumer1 disconnects (graceful) - should trigger full rebalance
+    //    clearing all pending revocations
+    drop(client1);
+    sleep(Duration::from_millis(500)).await;
+
+    // 4. Consumer2 should now get ALL partitions via full rebalance
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(
+        cg.members_count, 1,
+        "Only consumer2 should remain. Members: {:?}",
+        cg.members
+    );
+    assert_eq!(
+        cg.members[0].partitions_count, PARTITIONS_COUNT,
+        "Consumer2 should have all partitions after consumer1 left. Members: 
{:?}",
+        cg.members
+    );
+
+    // 5. Consumer2 can actually poll from all partitions
+    let mut polled_partitions = HashSet::new();
+    for _ in 0..PARTITIONS_COUNT {
+        let polled = client2
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+        if !polled.messages.is_empty() {
+            polled_partitions.insert(polled.partition_id);
+        }
+    }
+    assert_eq!(
+        polled_partitions.len(),
+        PARTITIONS_COUNT as usize,
+        "Consumer2 should be able to poll from all {PARTITIONS_COUNT} 
partitions"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_not_produce_duplicate_messages_with_sequential_consumer_joins(
+    harness: &TestHarness,
+) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+
+    // Send 5 messages to each partition
+    for partition_id in 0..PARTITIONS_COUNT {
+        for msg_idx in 0..5u32 {
+            let message = 
IggyMessage::from_str(&format!("p{partition_id}-msg{msg_idx}")).unwrap();
+            let mut messages = vec![message];
+            root_client
+                .send_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    &Partitioning::partition_id(partition_id),
+                    &mut messages,
+                )
+                .await
+                .unwrap();
+        }
+    }
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+
+    // 1. Consumer1 joins, gets all 3 partitions
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    // 2. Consumer1 polls 1 message with auto-commit (like the production app 
does)
+    let polled1 = client1
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            None,
+            &consumer,
+            &PollingStrategy::next(),
+            1,
+            true, // auto-commit like production
+        )
+        .await
+        .unwrap();
+    assert_eq!(polled1.messages.len(), 1);
+    let first_partition = polled1.partition_id;
+    let first_offset = polled1.messages[0].header.offset;
+
+    // 3. Consumer2 joins ~50ms later (simulating production timing)
+    sleep(Duration::from_millis(50)).await;
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    // 4. Consumer3 joins ~50ms later
+    sleep(Duration::from_millis(50)).await;
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    // 5. Small delay for revocation completions
+    sleep(Duration::from_millis(200)).await;
+
+    // 6. All 3 consumers poll messages - collect ALL messages across all 
consumers
+    let mut all_messages: Vec<(u32, u64)> = Vec::new(); // (partition_id, 
offset)
+
+    // Already got one from consumer1
+    all_messages.push((first_partition, first_offset));
+
+    // Each consumer polls multiple times to drain their partitions
+    for client in [&client1, &client2, &client3] {
+        for _ in 0..20 {
+            let polled = client
+                .poll_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    None,
+                    &consumer,
+                    &PollingStrategy::next(),
+                    1,
+                    true,
+                )
+                .await
+                .unwrap();
+            if polled.messages.is_empty() {
+                break;
+            }
+            for msg in &polled.messages {
+                all_messages.push((polled.partition_id, msg.header.offset));
+            }
+        }
+    }
+
+    // 7. Check for duplicates: same (partition_id, offset) must never appear 
twice
+    let mut seen = HashSet::new();
+    for (partition_id, offset) in &all_messages {
+        assert!(
+            seen.insert((*partition_id, *offset)),
+            "DUPLICATE MESSAGE DETECTED! partition={partition_id}, 
offset={offset}. \
+             This is the exact production bug - two consumers processed the 
same message. \
+             All messages: {all_messages:?}"
+        );
+    }
+
+    // 8. Verify we got all 15 messages (5 per partition x 3 partitions)
+    assert_eq!(
+        all_messages.len(),
+        15,
+        "Expected 15 total messages (5 per partition x 3 partitions), got {}. \
+         Messages: {:?}",
+        all_messages.len(),
+        all_messages
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_wait_for_manual_commit_before_completing_revocation(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Client1 joins, rapidly polls all 3 partitions WITHOUT committing
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut polled_offsets: Vec<(u32, u64)> = Vec::new();
+
+    for _ in 0..PARTITIONS_COUNT {
+        let polled = client1
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                false, // manual commit
+            )
+            .await
+            .unwrap();
+        assert_eq!(polled.messages.len(), 1);
+        polled_offsets.push((polled.partition_id, 
polled.messages[0].header.offset));
+    }
+
+    // 2. Consumer2 joins - all partitions have in-flight work
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    sleep(Duration::from_millis(100)).await;
+
+    // 3. Client2 should get NOTHING - all partitions have pending revocations
+    //    that can't complete because client1 hasn't committed
+    let polled2 = client2
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            None,
+            &consumer,
+            &PollingStrategy::next(),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+    assert!(
+        polled2.messages.is_empty(),
+        "Client2 should get NO messages while client1 has uncommitted 
in-flight work \
+         on all partitions, but got {} messages from partition {}",
+        polled2.messages.len(),
+        polled2.partition_id
+    );
+
+    // 4. Client1 commits each partition one by one
+    for (partition_id, offset) in &polled_offsets {
+        client1
+            .store_consumer_offset(
+                &consumer,
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                Some(*partition_id),
+                *offset,
+            )
+            .await
+            .unwrap();
+    }
+
+    sleep(Duration::from_millis(200)).await;
+
+    // 5. Now consumer2 should have partitions
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+
+    for member in &cg.members {
+        assert!(
+            !member.partitions.is_empty(),
+            "Member {} has no partitions after client1 committed. Members: 
{:?}",
+            member.id,
+            cg.members
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_redistribute_when_revocation_target_leaves(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Consumer1 joins, polls without commit (creates in-flight work)
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let _polled = client1
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            None,
+            &consumer,
+            &PollingStrategy::next(),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+
+    // 2. Consumer2 joins - gets some partitions, some become pending 
revocation
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    sleep(Duration::from_millis(100)).await;
+
+    // 3. Consumer2 (the revocation TARGET) disconnects before revocation 
completes
+    drop(client2);
+    sleep(Duration::from_millis(500)).await;
+
+    // 4. Consumer1 should now have ALL partitions back via full rebalance
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(
+        cg.members_count, 1,
+        "Only consumer1 should remain. Members: {:?}",
+        cg.members
+    );
+    assert_eq!(
+        cg.members[0].partitions_count, PARTITIONS_COUNT,
+        "Consumer1 should have all {} partitions after target left. Members: 
{:?}",
+        PARTITIONS_COUNT, cg.members
+    );
+
+    // 5. Consumer1 can poll from all partitions
+    let mut polled_partitions = HashSet::new();
+    for _ in 0..PARTITIONS_COUNT {
+        let polled = client1
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+        if !polled.messages.is_empty() {
+            polled_partitions.insert(polled.partition_id);
+        }
+    }
+    assert_eq!(
+        polled_partitions.len(),
+        PARTITIONS_COUNT as usize,
+        "Consumer1 should poll from all partitions after target left"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_distribute_partitions_evenly_with_concurrent_joins(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // Create 3 clients and login
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    let client3 = harness.new_client().await.unwrap();
+
+    for client in [&client1, &client2, &client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+    }
+
+    // Join all 3 concurrently
+    let stream_id = Identifier::named(STREAM_NAME).unwrap();
+    let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+    let group_id = Identifier::named(CONSUMER_GROUP_NAME).unwrap();
+
+    let (r1, r2, r3) = tokio::join!(
+        client1.join_consumer_group(&stream_id, &topic_id, &group_id),
+        client2.join_consumer_group(&stream_id, &topic_id, &group_id),
+        client3.join_consumer_group(&stream_id, &topic_id, &group_id),
+    );
+    r1.unwrap();
+    r2.unwrap();
+    r3.unwrap();
+
+    // Small delay for any pending revocation completions
+    sleep(Duration::from_millis(200)).await;
+
+    // Verify: 3 members, each with exactly 1 partition
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(
+        cg.members_count, 3,
+        "Expected 3 members after concurrent joins, got {}. Members: {:?}",
+        cg.members_count, cg.members
+    );
+    assert_unique_partition_assignments(&cg);
+
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 1,
+            "Each member should have exactly 1 partition with concurrent 
joins. \
+             Member {} has {}. Members: {:?}",
+            member.id, member.partitions_count, cg.members
+        );
+    }
+
+    // Each consumer polls - must get unique partitions
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut polled_partitions = HashSet::new();
+
+    for (i, client) in [&client1, &client2, &client3].iter().enumerate() {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                10,
+                false,
+            )
+            .await
+            .unwrap();
+        assert!(
+            !polled.messages.is_empty(),
+            "Client {i} should have messages but got none"
+        );
+        assert!(
+            polled_partitions.insert(polled.partition_id),
+            "Client {i} got partition {} already taken by another client! 
Duplicate!",
+            polled.partition_id
+        );
+    }
+
+    assert_eq!(
+        polled_partitions.len(),
+        3,
+        "All 3 partitions must be covered"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 995a65ca3..7d5b86867 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "server"
-version = "0.6.3-edge.2"
+version = "0.6.3-edge.3"
 edition = "2024"
 license = "Apache-2.0"
 
diff --git a/core/server/src/metadata/absorb.rs 
b/core/server/src/metadata/absorb.rs
index 50af5b514..f02289e3c 100644
--- a/core/server/src/metadata/absorb.rs
+++ b/core/server/src/metadata/absorb.rs
@@ -19,17 +19,18 @@ use crate::metadata::ConsumerGroupMemberMeta;
 use crate::metadata::inner::InnerMetadata;
 use crate::metadata::ops::MetadataOp;
 use crate::metadata::{StreamId, UserId};
+use crate::streaming::polling_consumer::ConsumerGroupId;
 use iggy_common::Permissions;
 use left_right::Absorb;
 use std::sync::atomic::Ordering;
 
 impl Absorb<MetadataOp> for InnerMetadata {
-    fn absorb_first(&mut self, op: &mut MetadataOp, _other: &Self) {
-        apply_op(self, op, true);
+    fn absorb_first(&mut self, op: &mut MetadataOp, other: &Self) {
+        apply_op(self, op, true, other);
     }
 
-    fn absorb_second(&mut self, op: MetadataOp, _other: &Self) {
-        apply_op(self, &op, false);
+    fn absorb_second(&mut self, op: MetadataOp, other: &Self) {
+        apply_op(self, &op, false, other);
     }
 
     fn sync_with(&mut self, first: &Self) {
@@ -37,7 +38,12 @@ impl Absorb<MetadataOp> for InnerMetadata {
     }
 }
 
-fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, populate_ids: bool) 
{
+fn apply_op(
+    metadata: &mut InnerMetadata,
+    op: &MetadataOp,
+    populate_ids: bool,
+    reader_copy: &InnerMetadata,
+) {
     match op {
         MetadataOp::Initialize(initial) => {
             *metadata = (**initial).clone();
@@ -301,7 +307,29 @@ fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, 
populate_ids: bool) {
 
                 let new_member = ConsumerGroupMemberMeta::new(next_id, 
*client_id);
                 group.members.insert(new_member);
-                group.rebalance_members();
+                group.assign_partitions_to_new_members();
+            }
+
+            // Complete revocations for partitions with no in-flight work
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+            {
+                let cg_id = ConsumerGroupId(*group_id);
+                let reader_group = reader_copy
+                    .streams
+                    .get(*stream_id)
+                    .and_then(|s| s.topics.get(*topic_id))
+                    .and_then(|t| t.consumer_groups.get(*group_id));
+                let completable = topic
+                    .consumer_groups
+                    .get(*group_id)
+                    .map(|g| g.find_completable_revocations(&topic.partitions, 
cg_id, reader_group))
+                    .unwrap_or_default();
+                if let Some(group) = topic.consumer_groups.get_mut(*group_id) {
+                    for (member_slab_id, member_id, partition_id) in 
completable {
+                        group.complete_revocation(member_slab_id, member_id, 
partition_id);
+                    }
+                }
             }
         }
 
@@ -351,6 +379,22 @@ fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, 
populate_ids: bool) {
                 }
             }
         }
+
+        MetadataOp::CompletePartitionRevocation {
+            stream_id,
+            topic_id,
+            group_id,
+            member_slab_id,
+            member_id,
+            partition_id,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+                && let Some(group) = topic.consumer_groups.get_mut(*group_id)
+            {
+                group.complete_revocation(*member_slab_id, *member_id, 
*partition_id);
+            }
+        }
     }
 }
 
diff --git a/core/server/src/metadata/consumer_group.rs 
b/core/server/src/metadata/consumer_group.rs
index d44dd5ecb..4c4bff942 100644
--- a/core/server/src/metadata/consumer_group.rs
+++ b/core/server/src/metadata/consumer_group.rs
@@ -16,9 +16,13 @@
 // under the License.
 
 use crate::metadata::consumer_group_member::ConsumerGroupMemberMeta;
+use crate::metadata::partition::PartitionMeta;
 use crate::metadata::{ConsumerGroupId, PartitionId};
+use crate::streaming::polling_consumer::ConsumerGroupId as CgId;
 use slab::Slab;
+use std::collections::HashSet;
 use std::sync::Arc;
+use std::sync::atomic::Ordering;
 
 #[derive(Clone, Debug)]
 pub struct ConsumerGroupMeta {
@@ -29,7 +33,8 @@ pub struct ConsumerGroupMeta {
 }
 
 impl ConsumerGroupMeta {
-    /// Rebalance partition assignments among members (round-robin).
+    /// Full rebalance: clear all assignments and redistribute round-robin.
+    /// Used when a member leaves or partition count changes.
     pub fn rebalance_members(&mut self) {
         let partition_count = self.partitions.len();
         let member_count = self.members.len();
@@ -38,11 +43,12 @@ impl ConsumerGroupMeta {
             return;
         }
 
-        // Clear all member partitions
+        // Clear all member partitions and pending revocations
         let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| 
id).collect();
         for &member_id in &member_ids {
             if let Some(member) = self.members.get_mut(member_id) {
                 member.partitions.clear();
+                member.pending_revocations.clear();
             }
         }
 
@@ -56,4 +62,210 @@ impl ConsumerGroupMeta {
             }
         }
     }
+
+    /// Incremental partition assignment for a new member join.
+    pub fn assign_partitions_to_new_members(&mut self) {
+        let member_count = self.members.len();
+        if member_count == 0 || self.partitions.is_empty() {
+            return;
+        }
+
+        // Find which partitions are already assigned
+        let mut assigned: HashSet<PartitionId> = HashSet::new();
+        for (_, member) in self.members.iter() {
+            for &pid in &member.partitions {
+                assigned.insert(pid);
+            }
+        }
+
+        // Step 1: Assign unassigned partitions to idle members
+        let unassigned: Vec<PartitionId> = self
+            .partitions
+            .iter()
+            .copied()
+            .filter(|pid| !assigned.contains(pid))
+            .collect();
+
+        if !unassigned.is_empty() {
+            let idle_member_ids: Vec<usize> = self
+                .members
+                .iter()
+                .filter(|(_, m)| m.partitions.is_empty())
+                .map(|(id, _)| id)
+                .collect();
+
+            if !idle_member_ids.is_empty() {
+                for (i, partition_id) in unassigned.into_iter().enumerate() {
+                    let member_idx = i % idle_member_ids.len();
+                    if let Some(member) = 
self.members.get_mut(idle_member_ids[member_idx]) {
+                        member.partitions.push(partition_id);
+                    }
+                }
+            }
+        }
+
+        // Step 2: Mark excess partitions as pending revocation
+        let partition_count = self.partitions.len();
+        let fair_share = partition_count / member_count;
+        let remainder = partition_count % member_count;
+
+        // Collect members already targeted by a pending revocation
+        let revocation_targets: HashSet<usize> = self
+            .members
+            .iter()
+            .flat_map(|(_, m)| m.pending_revocations.iter().map(|(_, target)| 
*target))
+            .collect();
+
+        // Collect idle members (no partitions and not already a revocation 
target)
+        let mut idle_slab_ids: Vec<usize> = self
+            .members
+            .iter()
+            .filter(|(id, m)| m.partitions.is_empty() && 
!revocation_targets.contains(id))
+            .map(|(id, _)| id)
+            .collect();
+
+        if idle_slab_ids.is_empty() {
+            return;
+        }
+
+        // Find over-assigned members and mark excess as pending revocation
+        let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| 
id).collect();
+        let mut members_with_remainder = remainder;
+
+        for &mid in &member_ids {
+            if idle_slab_ids.is_empty() {
+                break;
+            }
+            let effective_count = self
+                .members
+                .get(mid)
+                .map(|m| {
+                    let pending: HashSet<PartitionId> =
+                        m.pending_revocations.iter().map(|(pid, _)| 
*pid).collect();
+                    m.partitions.iter().filter(|p| 
!pending.contains(p)).count()
+                })
+                .unwrap_or(0);
+
+            // This member's max allowed partitions
+            let max_allowed = if members_with_remainder > 0 {
+                fair_share + 1
+            } else {
+                fair_share
+            };
+
+            if effective_count <= max_allowed {
+                if effective_count > fair_share {
+                    members_with_remainder = 
members_with_remainder.saturating_sub(1);
+                }
+                continue;
+            }
+
+            // Mark excess partitions as pending revocation (from the END of 
the list)
+            let excess_count = effective_count - max_allowed;
+            if members_with_remainder > 0 && effective_count > fair_share {
+                members_with_remainder = 
members_with_remainder.saturating_sub(1);
+            }
+
+            // Collect partitions eligible for revocation (not already pending)
+            let revocable: Vec<PartitionId> = self
+                .members
+                .get(mid)
+                .map(|m| {
+                    let pending: HashSet<PartitionId> =
+                        m.pending_revocations.iter().map(|(pid, _)| 
*pid).collect();
+                    m.partitions
+                        .iter()
+                        .rev()
+                        .filter(|p| !pending.contains(p))
+                        .copied()
+                        .collect()
+                })
+                .unwrap_or_default();
+
+            for partition_id in revocable.into_iter().take(excess_count) {
+                if idle_slab_ids.is_empty() {
+                    break;
+                }
+                let idle_id = idle_slab_ids.remove(0);
+                if let Some(member) = self.members.get_mut(mid) {
+                    member.pending_revocations.push((partition_id, idle_id));
+                }
+            }
+        }
+    }
+
+    /// Find revocations completable immediately (never polled or already 
committed).
+    pub fn find_completable_revocations(
+        &self,
+        partitions: &[PartitionMeta],
+        cg_id: CgId,
+        reader_group: Option<&ConsumerGroupMeta>,
+    ) -> Vec<(usize, usize, PartitionId)> {
+        let mut result = Vec::new();
+        for (slab_id, member) in self.members.iter() {
+            // Read last_polled_offsets from the reader copy (where data 
shards write)
+            let reader_offsets = reader_group
+                .and_then(|rg| rg.members.get(slab_id))
+                .map(|rm| &rm.last_polled_offsets);
+
+            for &(pid, _) in &member.pending_revocations {
+                let last_polled = reader_offsets.and_then(|offsets| {
+                    let guard = offsets.pin();
+                    guard.get(&pid).map(|v| v.load(Ordering::Relaxed))
+                });
+                let can_complete = match last_polled {
+                    None => true,
+                    Some(polled) => partitions
+                        .get(pid)
+                        .and_then(|p| {
+                            let g = p.consumer_group_offsets.pin();
+                            g.get(&cg_id).map(|co| 
co.offset.load(Ordering::Relaxed))
+                        })
+                        .is_some_and(|committed| committed >= polled),
+                };
+                if can_complete {
+                    result.push((slab_id, member.id, pid));
+                }
+            }
+        }
+        result
+    }
+
+    /// Complete a pending revocation, moving the partition to the target 
member.
+    pub fn complete_revocation(
+        &mut self,
+        member_slab_id: usize,
+        member_id: usize,
+        partition_id: PartitionId,
+    ) -> bool {
+        let target_slab_id = if let Some(member) = 
self.members.get_mut(member_slab_id) {
+            if member.id != member_id {
+                return false;
+            }
+            let pos = member
+                .pending_revocations
+                .iter()
+                .position(|(pid, _)| *pid == partition_id);
+            if let Some(pos) = pos {
+                let (_, target) = member.pending_revocations.remove(pos);
+                member.partitions.retain(|&p| p != partition_id);
+                Some(target)
+            } else {
+                None
+            }
+        } else {
+            None
+        };
+
+        if let Some(target) = target_slab_id {
+            if let Some(target_member) = self.members.get_mut(target) {
+                target_member.partitions.push(partition_id);
+                return true;
+            }
+            self.rebalance_members();
+            return true;
+        }
+
+        false
+    }
 }
diff --git a/core/server/src/metadata/consumer_group_member.rs 
b/core/server/src/metadata/consumer_group_member.rs
index 8543351cb..fe78e593b 100644
--- a/core/server/src/metadata/consumer_group_member.rs
+++ b/core/server/src/metadata/consumer_group_member.rs
@@ -17,7 +17,7 @@
 
 use crate::metadata::{ClientId, ConsumerGroupMemberId, PartitionId};
 use std::sync::Arc;
-use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::{AtomicU64, AtomicUsize};
 
 #[derive(Clone, Debug)]
 pub struct ConsumerGroupMemberMeta {
@@ -25,6 +25,10 @@ pub struct ConsumerGroupMemberMeta {
     pub client_id: ClientId,
     pub partitions: Vec<PartitionId>,
     pub partition_index: Arc<AtomicUsize>,
+    /// Partitions pending cooperative revocation: (partition_id, 
target_member_slab_id).
+    pub pending_revocations: Vec<(PartitionId, usize)>,
+    /// Last offset returned per partition during poll, for cooperative 
rebalance.
+    pub last_polled_offsets: Arc<papaya::HashMap<PartitionId, Arc<AtomicU64>>>,
 }
 
 impl ConsumerGroupMemberMeta {
@@ -34,6 +38,8 @@ impl ConsumerGroupMemberMeta {
             client_id,
             partitions: Vec::new(),
             partition_index: Arc::new(AtomicUsize::new(0)),
+            pending_revocations: Vec::new(),
+            last_polled_offsets: Arc::new(papaya::HashMap::new()),
         }
     }
 }
diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs
index 4c0328ee5..ce6d5915a 100644
--- a/core/server/src/metadata/ops.rs
+++ b/core/server/src/metadata/ops.rs
@@ -17,8 +17,8 @@
 
 use crate::metadata::inner::InnerMetadata;
 use crate::metadata::{
-    ConsumerGroupId, ConsumerGroupMeta, PartitionMeta, StreamId, StreamMeta, 
TopicId, TopicMeta,
-    UserId, UserMeta,
+    ConsumerGroupId, ConsumerGroupMeta, PartitionId, PartitionMeta, StreamId, 
StreamMeta, TopicId,
+    TopicMeta, UserId, UserMeta,
 };
 use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize, 
PersonalAccessToken};
 use std::sync::Arc;
@@ -119,4 +119,12 @@ pub enum MetadataOp {
         topic_id: TopicId,
         partitions_count: u32,
     },
+    CompletePartitionRevocation {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        member_slab_id: usize,
+        member_id: usize,
+        partition_id: PartitionId,
+    },
 }
diff --git a/core/server/src/metadata/reader.rs 
b/core/server/src/metadata/reader.rs
index d2ff0dd9a..bae83c982 100644
--- a/core/server/src/metadata/reader.rs
+++ b/core/server/src/metadata/reader.rs
@@ -271,25 +271,117 @@ impl Metadata {
             .members
             .get(member_id)?;
 
-        let assigned_partitions = &member.partitions;
-        if assigned_partitions.is_empty() {
+        // Fast path: no pending revocations
+        if member.pending_revocations.is_empty() {
+            let partitions = &member.partitions;
+            let count = partitions.len();
+            if count == 0 {
+                return None;
+            }
+            let counter = &member.partition_index;
+            return if calculate {
+                let current = counter
+                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
+                        Some((c + 1) % count)
+                    })
+                    .unwrap();
+                Some(partitions[current % count])
+            } else {
+                let current = counter.load(Ordering::Relaxed);
+                Some(partitions[current % count])
+            };
+        }
+
+        // Slow path: skip revoked partitions via linear scan
+        let effective_count = member.partitions.len() - 
member.pending_revocations.len();
+        if effective_count == 0 {
             return None;
         }
 
-        let partitions_count = assigned_partitions.len();
         let counter = &member.partition_index;
-
-        if calculate {
-            let current = counter
-                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
-                    Some((current + 1) % partitions_count)
+        let idx = if calculate {
+            counter
+                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
+                    Some((c + 1) % effective_count)
                 })
-                .unwrap();
-            Some(assigned_partitions[current % partitions_count])
+                .unwrap()
+                % effective_count
         } else {
-            let current = counter.load(Ordering::Relaxed);
-            Some(assigned_partitions[current % partitions_count])
+            counter.load(Ordering::Relaxed) % effective_count
+        };
+
+        // Find the idx-th non-revoked partition
+        let mut seen = 0;
+        for &pid in &member.partitions {
+            let is_revoked = member
+                .pending_revocations
+                .iter()
+                .any(|(rpid, _)| *rpid == pid);
+            if is_revoked {
+                continue;
+            }
+            if seen == idx {
+                return Some(pid);
+            }
+            seen += 1;
         }
+
+        None
+    }
+
+    /// Record the last offset returned to a CG member during poll.
+    pub fn record_polled_offset(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        member_id: usize,
+        partition_id: PartitionId,
+        offset: u64,
+    ) {
+        let metadata = self.load();
+        if let Some(member) = metadata
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id))
+            .and_then(|g| g.members.get(member_id))
+        {
+            let guard = member.last_polled_offsets.pin();
+            match guard.get(&partition_id) {
+                Some(existing) => {
+                    existing.store(offset, Ordering::Relaxed);
+                }
+                None => {
+                    guard.insert(
+                        partition_id,
+                        Arc::new(std::sync::atomic::AtomicU64::new(offset)),
+                    );
+                }
+            }
+        }
+    }
+
+    /// Get the last offset polled by a CG member for a partition.
+    pub fn get_last_polled_offset(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        member_id: usize,
+        partition_id: PartitionId,
+    ) -> Option<u64> {
+        let metadata = self.load();
+        metadata
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id))
+            .and_then(|g| g.members.get(member_id))
+            .and_then(|m| {
+                let guard = m.last_polled_offsets.pin();
+                guard.get(&partition_id).map(|v| v.load(Ordering::Relaxed))
+            })
     }
 
     pub fn users_count(&self) -> usize {
diff --git a/core/server/src/metadata/writer.rs 
b/core/server/src/metadata/writer.rs
index 2e3a409e9..3023bd00e 100644
--- a/core/server/src/metadata/writer.rs
+++ b/core/server/src/metadata/writer.rs
@@ -295,6 +295,26 @@ impl MetadataWriter {
         self.publish();
     }
 
+    pub fn complete_partition_revocation(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        member_slab_id: usize,
+        member_id: usize,
+        partition_id: PartitionId,
+    ) {
+        self.append(MetadataOp::CompletePartitionRevocation {
+            stream_id,
+            topic_id,
+            group_id,
+            member_slab_id,
+            member_id,
+            partition_id,
+        });
+        self.publish();
+    }
+
     // High-level registration methods with validation
 
     pub fn create_stream(
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index a2e14800c..7cbbb8368 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -352,6 +352,30 @@ async fn handle_request(
 
             Ok(ShardResponse::LeaveConsumerGroupMetadataOnlyResponse)
         }
+        ShardRequestPayload::CompletePartitionRevocation {
+            stream_id,
+            topic_id,
+            group_id,
+            member_slab_id,
+            member_id,
+            partition_id,
+        } => {
+            assert_eq!(
+                shard.id, 0,
+                "CompletePartitionRevocation should only be handled by shard0"
+            );
+
+            shard.writer().complete_partition_revocation(
+                stream_id,
+                topic_id,
+                group_id,
+                member_slab_id,
+                member_id,
+                partition_id,
+            );
+
+            Ok(ShardResponse::CompletePartitionRevocationResponse)
+        }
         ShardRequestPayload::SocketTransfer {
             fd,
             from_shard,
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index e8f76cde5..717ff1caa 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -19,7 +19,7 @@
 use super::COMPONENT;
 use crate::{
     shard::IggyShard,
-    shard::transmission::message::ResolvedTopic,
+    shard::transmission::message::{ResolvedTopic, ShardRequest, 
ShardRequestPayload},
     streaming::{
         partitions::consumer_offset::ConsumerOffset,
         polling_consumer::{ConsumerGroupId, PollingConsumer},
@@ -76,6 +76,15 @@ impl IggyShard {
             partition_id,
         )
         .await?;
+
+        self.maybe_complete_pending_revocation(
+            &polling_consumer,
+            topic.stream_id,
+            topic.topic_id,
+            partition_id,
+        )
+        .await;
+
         Ok((polling_consumer, partition_id))
     }
 
@@ -414,6 +423,66 @@ impl IggyShard {
         Ok(())
     }
 
+    /// Complete a pending partition revocation if this offset commit 
satisfies it.
+    pub(crate) async fn maybe_complete_pending_revocation(
+        &self,
+        polling_consumer: &PollingConsumer,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    ) {
+        let PollingConsumer::ConsumerGroup(group_id, member_id) = 
polling_consumer else {
+            return;
+        };
+
+        let completion_info = self
+            .metadata
+            .get_consumer_group(stream_id, topic_id, group_id.0)
+            .and_then(|cg| {
+                let member = cg.members.get(member_id.0)?;
+                if !member
+                    .pending_revocations
+                    .iter()
+                    .any(|(pid, _)| *pid == partition_id)
+                {
+                    return None;
+                }
+                let last_polled = {
+                    let guard = member.last_polled_offsets.pin();
+                    guard.get(&partition_id).map(|v| v.load(Ordering::Relaxed))
+                };
+                let can_complete = match last_polled {
+                    None => true,
+                    Some(polled) => {
+                        let committed = self
+                            .metadata
+                            .get_partition_consumer_group_offsets(stream_id, 
topic_id, partition_id)
+                            .and_then(|offsets| {
+                                let guard = offsets.pin();
+                                guard
+                                    .get(group_id)
+                                    .map(|co| 
co.offset.load(Ordering::Relaxed))
+                            });
+                        committed.is_some_and(|c| c >= polled)
+                    }
+                };
+                if can_complete { Some(member.id) } else { None }
+            });
+
+        if let Some(logical_member_id) = completion_info {
+            let request =
+                
ShardRequest::control_plane(ShardRequestPayload::CompletePartitionRevocation {
+                    stream_id,
+                    topic_id,
+                    group_id: group_id.0,
+                    member_slab_id: member_id.0,
+                    member_id: logical_member_id,
+                    partition_id,
+                });
+            let _ = self.send_to_control_plane(request).await;
+        }
+    }
+
     async fn delete_all_files_in_dir(dir: &str) -> Result<(), IggyError> {
         let entries = match std::fs::read_dir(dir) {
             Ok(entries) => entries,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index bc07ea457..0f3e82b7f 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -102,6 +102,20 @@ impl IggyShard {
             batch
         };
 
+        // Track last offset sent to CG member for cooperative rebalance.
+        if let PollingConsumer::ConsumerGroup(group_id, member_id) = &consumer
+            && let Some(last_offset) = batch.last_offset()
+        {
+            self.metadata.record_polled_offset(
+                topic.stream_id,
+                topic.topic_id,
+                group_id.0,
+                member_id.0,
+                partition_id,
+                last_offset,
+            );
+        }
+
         Ok((metadata, batch))
     }
 
@@ -281,6 +295,15 @@ impl IggyShard {
         };
 
         crate::streaming::partitions::storage::persist_offset(&path, 
offset_value).await?;
+
+        self.maybe_complete_pending_revocation(
+            &consumer,
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+        )
+        .await;
+
         Ok(())
     }
 
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index e6d630837..ab4351011 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -94,6 +94,7 @@ pub enum ShardResponse {
     CreatePersonalAccessTokenResponse(PersonalAccessToken, String),
     DeletePersonalAccessTokenResponse,
     LeaveConsumerGroupMetadataOnlyResponse,
+    CompletePartitionRevocationResponse,
     PurgeStreamResponse,
     PurgeTopicResponse,
     ErrorResponse(IggyError),
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 6fe952b92..7fbaddf37 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -222,6 +222,14 @@ pub enum ShardRequestPayload {
         group_id: usize,
         client_id: u32,
     },
+    CompletePartitionRevocation {
+        stream_id: usize,
+        topic_id: usize,
+        group_id: usize,
+        member_slab_id: usize,
+        member_id: usize,
+        partition_id: usize,
+    },
 
     // Control-plane: PAT operations
     CreatePersonalAccessTokenRequest {

Reply via email to