This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch collaborative_rebalancing
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f515dc5b7158e5aa4a5033b0729e5859be51de4a
Author: spetz <[email protected]>
AuthorDate: Fri Feb 13 12:44:48 2026 +0100

    cleanup
---
 .../tests/connectors/postgres/postgres_source.rs   |    2 +-
 .../verify_consumer_group_partition_assignment.rs  | 1589 +++++++++++++++++++-
 core/server/src/bootstrap.rs                       |    1 -
 core/server/src/metadata/absorb.rs                 |   20 +-
 core/server/src/metadata/consumer_group.rs         |   42 +-
 core/server/src/metadata/consumer_group_member.rs  |    1 +
 core/server/src/metadata/inner.rs                  |    4 -
 core/server/src/metadata/ops.rs                    |    1 +
 core/server/src/metadata/reader.rs                 |  103 +-
 core/server/src/metadata/writer.rs                 |    3 +
 core/server/src/shard/handlers.rs                  |    2 +
 core/server/src/shard/system/consumer_groups.rs    |    1 +
 core/server/src/shard/system/consumer_offsets.rs   |    9 +-
 core/server/src/shard/system/messages.rs           |    4 +-
 .../src/shard/tasks/periodic/revocation_timeout.rs |    1 +
 core/server/src/shard/transmission/message.rs      |    1 +
 16 files changed, 1648 insertions(+), 136 deletions(-)

diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs 
b/core/integration/tests/connectors/postgres/postgres_source.rs
index de8bc391a..5d322df36 100644
--- a/core/integration/tests/connectors/postgres/postgres_source.rs
+++ b/core/integration/tests/connectors/postgres/postgres_source.rs
@@ -497,7 +497,7 @@ async fn state_persists_across_connector_restart(
         .start_dependents()
         .await
         .expect("Failed to restart connectors");
-    sleep(Duration::from_millis(500)).await;
+    sleep(Duration::from_secs(2)).await;
 
     let mut received_after: Vec<DatabaseRecord> = Vec::new();
     for _ in 0..POLL_ATTEMPTS {
diff --git 
a/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs
 
b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs
index fda75f8da..72d5d38a4 100644
--- 
a/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs
+++ 
b/core/integration/tests/data_integrity/verify_consumer_group_partition_assignment.rs
@@ -21,6 +21,7 @@ use integration::iggy_harness;
 use std::collections::HashSet;
 use std::str::FromStr;
 use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
 use tokio::time::{Duration, sleep};
 
 const STREAM_NAME: &str = "cg-partition-test-stream";
@@ -326,6 +327,1371 @@ async fn 
should_not_reshuffle_partitions_when_new_member_joins(harness: &TestHar
         .unwrap();
 }
 
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_not_lose_messages_with_concurrent_polls_during_partition_add(
+    harness: &TestHarness,
+) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+
+    // Send 10 messages per partition
+    for partition_id in 0..PARTITIONS_COUNT {
+        for i in 0..10u32 {
+            let message = 
IggyMessage::from_str(&format!("p{partition_id}-msg{i}")).unwrap();
+            let mut messages = vec![message];
+            root_client
+                .send_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    &Partitioning::partition_id(partition_id),
+                    &mut messages,
+                )
+                .await
+                .unwrap();
+        }
+    }
+
+    // 1. Two consumers join
+    let client1 = Arc::new(harness.new_client().await.unwrap());
+    let client2 = Arc::new(harness.new_client().await.unwrap());
+    for client in [&*client1, &*client2] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+
+    // 2. Start concurrent poll loops
+    let stop = Arc::new(AtomicBool::new(false));
+    let total_polled_1 = Arc::new(AtomicU64::new(0));
+    let total_polled_2 = Arc::new(AtomicU64::new(0));
+
+    let poll_task_1 = {
+        let client = client1.clone();
+        let stop = stop.clone();
+        let count = total_polled_1.clone();
+        tokio::spawn(async move {
+            let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+            while !stop.load(Ordering::Relaxed) {
+                let polled = client
+                    .poll_messages(
+                        &Identifier::named(STREAM_NAME).unwrap(),
+                        &Identifier::named(TOPIC_NAME).unwrap(),
+                        None,
+                        &consumer,
+                        &PollingStrategy::next(),
+                        1,
+                        true,
+                    )
+                    .await;
+                if let Ok(polled) = polled {
+                    count.fetch_add(polled.messages.len() as u64, 
Ordering::Relaxed);
+                }
+                sleep(Duration::from_millis(10)).await;
+            }
+        })
+    };
+
+    let poll_task_2 = {
+        let client = client2.clone();
+        let stop = stop.clone();
+        let count = total_polled_2.clone();
+        tokio::spawn(async move {
+            let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+            while !stop.load(Ordering::Relaxed) {
+                let polled = client
+                    .poll_messages(
+                        &Identifier::named(STREAM_NAME).unwrap(),
+                        &Identifier::named(TOPIC_NAME).unwrap(),
+                        None,
+                        &consumer,
+                        &PollingStrategy::next(),
+                        1,
+                        true,
+                    )
+                    .await;
+                if let Ok(polled) = polled {
+                    count.fetch_add(polled.messages.len() as u64, 
Ordering::Relaxed);
+                }
+                sleep(Duration::from_millis(10)).await;
+            }
+        })
+    };
+
+    // 3. While polling, add 3 partitions
+    sleep(Duration::from_millis(100)).await;
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    // Send messages to new partitions
+    for partition_id in PARTITIONS_COUNT..PARTITIONS_COUNT + 3 {
+        for i in 0..10u32 {
+            let message = 
IggyMessage::from_str(&format!("p{partition_id}-msg{i}")).unwrap();
+            let mut messages = vec![message];
+            root_client
+                .send_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    &Partitioning::partition_id(partition_id),
+                    &mut messages,
+                )
+                .await
+                .unwrap();
+        }
+    }
+
+    // Let polling continue for a bit after partition add
+    sleep(Duration::from_secs(1)).await;
+    stop.store(true, Ordering::Relaxed);
+    let _ = poll_task_1.await;
+    let _ = poll_task_2.await;
+
+    // 4. Verify: no duplicates, all partitions assigned
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 6,
+        "All 6 partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+
+    // Both consumers should have polled something
+    let p1 = total_polled_1.load(Ordering::Relaxed);
+    let p2 = total_polled_2.load(Ordering::Relaxed);
+    assert!(
+        p1 + p2 > 0,
+        "Consumers should have polled at least some messages"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn 
should_handle_partition_add_then_consumer_disconnect_then_new_join(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Two consumers join
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    for client in [&client1, &client2] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+
+    // Both poll and auto-commit
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    for client in [&client1, &client2] {
+        let _ = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 2. Add 3 partitions (3 → 6)
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    for partition_id in PARTITIONS_COUNT..PARTITIONS_COUNT + 3 {
+        let message = 
IggyMessage::from_str(&format!("new-p{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 3. Consumer2 disconnects
+    drop(client2);
+    sleep(Duration::from_millis(500)).await;
+
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(
+        cg.members_count, 1,
+        "Only consumer1 should remain. Members: {:?}",
+        cg.members
+    );
+    assert_eq!(
+        cg.members[0].partitions_count, 6,
+        "Consumer1 should have all 6 partitions after consumer2 left. Members: 
{:?}",
+        cg.members
+    );
+
+    // 4. New consumer3 joins
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 5. Verify: both members have partitions, no duplicates
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 6,
+        "All 6 partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert!(
+            member.partitions_count >= 1,
+            "Both members must have partitions. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 6. Both can poll unique partitions
+    let mut polled_partitions = HashSet::new();
+    for client in [&client1, &client3] {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+        if !polled.messages.is_empty() {
+            assert!(
+                polled_partitions.insert(polled.partition_id),
+                "Duplicate partition {} after disconnect + rejoin!",
+                polled.partition_id
+            );
+        }
+    }
+    assert_eq!(
+        polled_partitions.len(),
+        2,
+        "Both consumers should poll from unique partitions"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn 
should_handle_partition_delete_while_multiple_consumers_polling(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    // Start with 6 partitions
+    root_client.create_stream(STREAM_NAME).await.unwrap();
+    root_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            6,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    root_client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    // Send messages to all 6 partitions
+    for partition_id in 0..6u32 {
+        for i in 0..5u32 {
+            let message = 
IggyMessage::from_str(&format!("p{partition_id}-m{i}")).unwrap();
+            let mut messages = vec![message];
+            root_client
+                .send_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    &Partitioning::partition_id(partition_id),
+                    &mut messages,
+                )
+                .await
+                .unwrap();
+        }
+    }
+
+    // 1. Three consumers join (6 partitions / 3 = 2 each)
+    let client1 = Arc::new(harness.new_client().await.unwrap());
+    let client2 = Arc::new(harness.new_client().await.unwrap());
+    let client3 = Arc::new(harness.new_client().await.unwrap());
+    for client in [&*client1, &*client2, &*client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+
+    // 2. Start concurrent poll loops
+    let stop = Arc::new(AtomicBool::new(false));
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+
+    let mut poll_tasks = Vec::new();
+    for client in [client1.clone(), client2.clone(), client3.clone()] {
+        let stop = stop.clone();
+        let consumer = consumer.clone();
+        poll_tasks.push(tokio::spawn(async move {
+            let mut count = 0u64;
+            while !stop.load(Ordering::Relaxed) {
+                let polled = client
+                    .poll_messages(
+                        &Identifier::named(STREAM_NAME).unwrap(),
+                        &Identifier::named(TOPIC_NAME).unwrap(),
+                        None,
+                        &consumer,
+                        &PollingStrategy::next(),
+                        1,
+                        true,
+                    )
+                    .await;
+                if let Ok(p) = polled {
+                    count += p.messages.len() as u64;
+                }
+                sleep(Duration::from_millis(10)).await;
+            }
+            count
+        }));
+    }
+
+    // 3. While all 3 consumers are polling, delete 3 partitions (6 → 3)
+    sleep(Duration::from_millis(200)).await;
+    root_client
+        .delete_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    // Let polling continue after delete
+    sleep(Duration::from_secs(1)).await;
+    stop.store(true, Ordering::Relaxed);
+    for task in poll_tasks {
+        let _ = task.await;
+    }
+
+    // 4. Verify: 3 members, 3 remaining partitions, no duplicates
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 3,
+        "Only 3 partitions should remain. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 1,
+            "Each of 3 members should have exactly 1 of 3 remaining 
partitions. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_reach_even_distribution_after_multiple_joins(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    // Start with 6 partitions for cleaner math
+    root_client.create_stream(STREAM_NAME).await.unwrap();
+    root_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            6,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    root_client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    for partition_id in 0..6u32 {
+        let message = 
IggyMessage::from_str(&format!("msg-p{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 1. Consumer1 joins — gets all 6 partitions
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    sleep(Duration::from_millis(100)).await;
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members[0].partitions_count, 6);
+
+    // 2. Consumer2 joins — cooperative rebalance moves some partitions
+    //    (never polled → immediate completion)
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    sleep(Duration::from_millis(200)).await;
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(total, 6);
+
+    // 3. Consumer3 joins — another round of cooperative rebalance
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    sleep(Duration::from_millis(200)).await;
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(total, 6);
+    for member in &cg.members {
+        assert!(
+            member.partitions_count >= 1,
+            "Every member must have at least 1 partition. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 4. Consumer4 joins — with 6 partitions and 4 members, cooperative 
rebalance
+    //    should give member4 at least 1 partition (fair_share=1, remainder=2)
+    let client4 = harness.new_client().await.unwrap();
+    client4
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client4).await;
+
+    sleep(Duration::from_millis(200)).await;
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 4);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(total, 6);
+    for member in &cg.members {
+        assert!(
+            member.partitions_count >= 1,
+            "Every member must have at least 1 partition with 4 members and 6 
partitions. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 5. Consumer5 + Consumer6 join — 6 partitions, 6 consumers → exactly 1 
each
+    let client5 = harness.new_client().await.unwrap();
+    let client6 = harness.new_client().await.unwrap();
+    for client in [&client5, &client6] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+        sleep(Duration::from_millis(200)).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 6);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(total, 6);
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 1,
+            "With 6 members and 6 partitions, each must have exactly 1. 
Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 6. All 6 consumers poll — must get unique partitions
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut polled_partitions = HashSet::new();
+    for client in [&client1, &client2, &client3, &client4, &client5, &client6] 
{
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                10,
+                false,
+            )
+            .await
+            .unwrap();
+        if !polled.messages.is_empty() {
+            assert!(
+                polled_partitions.insert(polled.partition_id),
+                "Duplicate partition {} with 6 consumers on 6 partitions!",
+                polled.partition_id
+            );
+        }
+    }
+    assert_eq!(
+        polled_partitions.len(),
+        6,
+        "All 6 partitions must be covered by 6 consumers"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn 
should_split_evenly_when_consumer_joins_after_partitions_added(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Two consumers join, split 3 partitions (2+1)
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    for client in [&client1, &client2] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(total, 3);
+
+    // 2. Add 3 partitions (3 -> 6), triggers full rebalance (3 each)
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 3. Send messages to new partitions
+    for partition_id in PARTITIONS_COUNT..PARTITIONS_COUNT + 3 {
+        let message =
+            
IggyMessage::from_str(&format!("message-new-partition-{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 4. Consumer3 joins — now 3 consumers, 6 partitions → 2 each
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    sleep(Duration::from_millis(500)).await;
+
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 6,
+        "All 6 partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert!(
+            member.partitions_count >= 1,
+            "Every member must have at least 1 partition after cooperative 
rebalance. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 5. Each consumer polls — must get unique partitions, no duplicates
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut polled_partitions = HashSet::new();
+    for client in [&client1, &client2, &client3] {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                10,
+                false,
+            )
+            .await
+            .unwrap();
+        if !polled.messages.is_empty() {
+            assert!(
+                polled_partitions.insert(polled.partition_id),
+                "Duplicate partition {} polled by multiple consumers!",
+                polled.partition_id
+            );
+        }
+    }
+    assert_eq!(
+        polled_partitions.len(),
+        3,
+        "All 3 consumers should poll from unique partitions"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn 
should_not_duplicate_messages_when_partitions_added_during_polling(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Two consumers join and start polling
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    for client in [&client1, &client2] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+
+    // 2. Both consumers poll without committing (in-flight work)
+    for client in [&client1, &client2] {
+        let _ = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 3. Add partitions WHILE consumers are active with 
polled-but-uncommitted data
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 4. After rebalance, verify no partition is assigned to both consumers
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 6,
+        "All 6 partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+
+    // 5. Both consumers poll again — must not get same partition
+    let mut seen_by_client: Vec<HashSet<u32>> = Vec::new();
+    for client in [&client1, &client2] {
+        let mut client_partitions = HashSet::new();
+        for _ in 0..3 {
+            let polled = client
+                .poll_messages(
+                    &Identifier::named(STREAM_NAME).unwrap(),
+                    &Identifier::named(TOPIC_NAME).unwrap(),
+                    None,
+                    &consumer,
+                    &PollingStrategy::offset(0),
+                    1,
+                    false,
+                )
+                .await
+                .unwrap();
+            if !polled.messages.is_empty() {
+                client_partitions.insert(polled.partition_id);
+            }
+        }
+        seen_by_client.push(client_partitions);
+    }
+    let overlap: HashSet<_> = 
seen_by_client[0].intersection(&seen_by_client[1]).collect();
+    assert!(
+        overlap.is_empty(),
+        "Consumers polled overlapping partitions after rebalance: {:?}",
+        overlap
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_handle_delete_partitions_with_uncommitted_work(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    // Create with 6 partitions
+    root_client.create_stream(STREAM_NAME).await.unwrap();
+    root_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            6,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    root_client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    // Send messages to all 6 partitions
+    for partition_id in 0..6u32 {
+        let message = 
IggyMessage::from_str(&format!("msg-{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 1. Two consumers join, each gets 3 partitions
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    for client in [&client1, &client2] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    // 2. Both consumers poll without committing
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    for client in [&client1, &client2] {
+        let _ = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 3. Delete 3 partitions while consumers have uncommitted work
+    root_client
+        .delete_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 4. After rebalance: 3 remaining partitions, no duplicates, even split
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 3,
+        "3 remaining partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert!(
+            member.partitions_count > 0,
+            "Every member should have at least 1 partition. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 5. Consumers can still poll from remaining partitions without error
+    for client in [&client1, &client2] {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+        assert!(
+            !polled.messages.is_empty(),
+            "Consumer should still be able to poll from remaining partitions"
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_handle_rapid_partition_changes_with_active_consumers(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Two consumers join
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    for client in [&client1, &client2] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+
+    // 2. Add partitions, then add more, then consumer3 joins — rapid changes
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    // Send messages to new partitions
+    for partition_id in 3..6u32 {
+        let message = 
IggyMessage::from_str(&format!("msg-{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    // Send messages to newest partitions
+    for partition_id in 6..9u32 {
+        let message = 
IggyMessage::from_str(&format!("msg-{partition_id}")).unwrap();
+        let mut messages = vec![message];
+        root_client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 3. 9 partitions, 3 consumers → 3 each
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 9,
+        "All 9 partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert!(
+            member.partitions_count >= 1,
+            "Every member must have at least 1 partition after cooperative 
rebalance. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 4. All consumers poll — unique partitions only
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    let mut polled_partitions = HashSet::new();
+    for client in [&client1, &client2, &client3] {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                10,
+                false,
+            )
+            .await
+            .unwrap();
+        if !polled.messages.is_empty() {
+            assert!(
+                polled_partitions.insert(polled.partition_id),
+                "Duplicate partition {} after rapid changes!",
+                polled.partition_id
+            );
+        }
+    }
+    assert_eq!(
+        polled_partitions.len(),
+        3,
+        "All 3 consumers should poll from unique partitions"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_rebalance_after_adding_partitions(harness: &TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Three consumers join, each gets 1 of 3 partitions
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    let client3 = harness.new_client().await.unwrap();
+    for client in [&client1, &client2, &client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 1,
+            "Each member should have 1 partition before adding. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 2. Add 3 more partitions (3 -> 6 total)
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 3. After rebalance, each consumer should have 2 partitions (6/3)
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 6,
+        "All 6 partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 2,
+            "Each member should have 2 partitions after adding 3 more. 
Members: {:?}",
+            cg.members
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_rebalance_after_deleting_partitions(harness: &TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    // Create with 6 partitions
+    root_client.create_stream(STREAM_NAME).await.unwrap();
+    root_client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            6,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    root_client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    // 1. Three consumers join, each gets 2 of 6 partitions
+    let client1 = harness.new_client().await.unwrap();
+    let client2 = harness.new_client().await.unwrap();
+    let client3 = harness.new_client().await.unwrap();
+    for client in [&client1, &client2, &client3] {
+        client
+            .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+            .await
+            .unwrap();
+        join_cg(client).await;
+    }
+
+    sleep(Duration::from_millis(200)).await;
+
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(total, 6);
+
+    // 2. Delete 3 partitions (6 -> 3)
+    root_client
+        .delete_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 3. After rebalance, each consumer should have 1 partition (3/3)
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 3,
+        "All 3 remaining partitions must be assigned. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 1,
+            "Each member should have 1 partition after deleting 3. Members: 
{:?}",
+            cg.members
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_handle_partition_add_during_pending_revocation(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Consumer1 joins, polls all 3 partitions without committing
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    for _ in 0..PARTITIONS_COUNT {
+        client1
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 2. Consumer2 joins — triggers pending revocations
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    sleep(Duration::from_millis(100)).await;
+
+    // 3. Add partitions WHILE revocations are pending
+    root_client
+        .create_partitions(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            3,
+        )
+        .await
+        .unwrap();
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 4. After full rebalance triggered by partition change,
+    //    all 6 partitions should be distributed, no duplicates
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, 6,
+        "All 6 partitions must be assigned after add during pending 
revocation. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert_eq!(
+            member.partitions_count, 3,
+            "Each of 2 members should have 3 of 6 partitions. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
 #[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
     consumer_group.rebalancing_timeout = "3s",
     consumer_group.rebalancing_check_interval = "1s"
@@ -750,11 +2116,9 @@ async fn get_consumer_group(client: &IggyClient) -> 
ConsumerGroupDetails {
 
 fn assert_unique_partition_assignments(cg: &ConsumerGroupDetails) {
     let mut all_partitions = HashSet::new();
-    let mut total_assigned = 0u32;
 
     for member in &cg.members {
         for &partition in &member.partitions {
-            total_assigned += 1;
             assert!(
                 all_partitions.insert(partition),
                 "Partition {partition} is assigned to multiple members! \
@@ -764,13 +2128,6 @@ fn assert_unique_partition_assignments(cg: 
&ConsumerGroupDetails) {
             );
         }
     }
-
-    assert_eq!(
-        total_assigned, PARTITIONS_COUNT,
-        "Expected {PARTITIONS_COUNT} total partition assignments, got 
{total_assigned}. \
-         Members: {:?}",
-        cg.members
-    );
 }
 
 async fn setup_stream_topic_cg(client: &IggyClient) {
@@ -1614,3 +2971,217 @@ async fn 
should_distribute_partitions_evenly_with_concurrent_joins(harness: &Tes
         .await
         .unwrap();
 }
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_not_assign_partition_to_wrong_member_after_slab_reuse(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Consumer1 joins, gets all 3 partitions, polls without committing
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    for _ in 0..PARTITIONS_COUNT {
+        client1
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 2. Consumer2 joins — triggers pending revocation targeting consumer2's 
slab
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    sleep(Duration::from_millis(100)).await;
+
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+
+    // 3. Consumer2 (revocation target) disconnects — its slab is freed
+    drop(client2);
+    sleep(Duration::from_secs(3)).await;
+
+    // 4. Consumer3 joins — may reuse consumer2's old slab
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    // 5. Consumer1 commits all offsets — revocation completion should detect
+    //    slab reuse via target_member_id validation and trigger full rebalance
+    for partition_id in 0..PARTITIONS_COUNT {
+        client1
+            .store_consumer_offset(
+                &consumer,
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                Some(partition_id),
+                0,
+            )
+            .await
+            .unwrap();
+    }
+
+    sleep(Duration::from_millis(500)).await;
+
+    // 6. Verify no partition duplication and all partitions assigned
+    let cg = get_consumer_group(&root_client).await;
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, PARTITIONS_COUNT,
+        "All partitions must be assigned after slab reuse scenario. Members: 
{:?}",
+        cg.members
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
+
+#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic], server(
+    heartbeat.enabled = true,
+    heartbeat.interval = "2s",
+    tcp.socket.override_defaults = true,
+    tcp.socket.nodelay = true
+))]
+async fn should_not_complete_other_members_revocations_on_leave(harness: 
&TestHarness) {
+    let root_client = harness.root_client().await.unwrap();
+
+    setup_stream_topic_cg(&root_client).await;
+    send_one_message_per_partition(&root_client).await;
+
+    // 1. Consumer1 joins, gets all 3 partitions, polls without committing
+    let client1 = harness.new_client().await.unwrap();
+    client1
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client1).await;
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    for _ in 0..PARTITIONS_COUNT {
+        client1
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+    }
+
+    // 2. Consumer2 joins — triggers pending revocation for 1 partition
+    let client2 = harness.new_client().await.unwrap();
+    client2
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client2).await;
+
+    // 3. Consumer3 joins — triggers another pending revocation
+    let client3 = harness.new_client().await.unwrap();
+    client3
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    join_cg(&client3).await;
+
+    sleep(Duration::from_millis(100)).await;
+
+    // Consumer1 has in-flight work on all partitions (polled, not committed).
+    // Consumer2 and consumer3 have 0 partitions (waiting for revocations).
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 3);
+
+    // 4. Consumer3 disconnects — triggers full rebalance (rebalance_members).
+    //    Leave clears only consumer3's polled offsets, not consumer1's.
+    drop(client3);
+    sleep(Duration::from_secs(3)).await;
+
+    // 5. After full rebalance, consumer1 and consumer2 split all partitions.
+    //    The key invariant: no partition is assigned to two members.
+    let cg = get_consumer_group(&root_client).await;
+    assert_eq!(cg.members_count, 2);
+    assert_unique_partition_assignments(&cg);
+    let total: u32 = cg.members.iter().map(|m| m.partitions_count).sum();
+    assert_eq!(
+        total, PARTITIONS_COUNT,
+        "All partitions must be assigned after leave. Members: {:?}",
+        cg.members
+    );
+    for member in &cg.members {
+        assert!(
+            member.partitions_count > 0,
+            "Both remaining members should have partitions after leave 
rebalance. Members: {:?}",
+            cg.members
+        );
+    }
+
+    // 6. Both consumers can poll without duplicates
+    let mut all_partitions = HashSet::new();
+    for client in [&client1, &client2] {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::offset(0),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+        if !polled.messages.is_empty() {
+            assert!(
+                all_partitions.insert(polled.partition_id),
+                "Duplicate partition {} after leave rebalance",
+                polled.partition_id
+            );
+        }
+    }
+    assert_eq!(
+        all_partitions.len(),
+        2,
+        "Both consumers should poll from unique partitions after leave"
+    );
+
+    // Cleanup
+    root_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index ea10b1816..3d95fb9c3 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -657,7 +657,6 @@ pub fn build_inner_metadata(
         users_can_send_all_streams: Default::default(),
         users_can_poll_stream: Default::default(),
         users_can_send_stream: Default::default(),
-        pending_revocations_count: Default::default(),
     }
 }
 
diff --git a/core/server/src/metadata/absorb.rs 
b/core/server/src/metadata/absorb.rs
index ef224a0c3..a90734f2a 100644
--- a/core/server/src/metadata/absorb.rs
+++ b/core/server/src/metadata/absorb.rs
@@ -309,7 +309,7 @@ fn apply_op(
                 if !group.members.iter().any(|(_, m)| m.client_id == 
*client_id) {
                     let new_member = ConsumerGroupMemberMeta::new(next_id, 
*client_id);
                     group.members.insert(new_member);
-                    group.assign_partitions_to_new_members();
+                    group.rebalance_cooperative();
                 }
 
                 if populate_ids {
@@ -343,13 +343,23 @@ fn apply_op(
                     if populate_ids {
                         removed_member_id.store(member_id, Ordering::Release);
                     }
+                    // Partitions owned by the leaving member
+                    let leaving_partitions: Vec<usize> = group
+                        .members
+                        .get(member_id)
+                        .map(|m| m.partitions.clone())
+                        .unwrap_or_default();
+
                     group.members.remove(member_id);
                     group.rebalance_members();
 
+                    // Clear polled offsets only for the leaving member's 
partitions
                     let consumer_group_id = ConsumerGroupId(*group_id);
-                    for partition in topic.partitions.iter() {
-                        let guard = partition.last_polled_offsets.pin();
-                        guard.remove(&consumer_group_id);
+                    for partition_id in leaving_partitions {
+                        if let Some(partition) = 
topic.partitions.get(partition_id) {
+                            let guard = partition.last_polled_offsets.pin();
+                            guard.remove(&consumer_group_id);
+                        }
                     }
                 }
             }
@@ -388,11 +398,13 @@ fn apply_op(
             member_slab_id,
             member_id,
             partition_id,
+            timed_out: _,
         } => {
             if let Some(stream) = metadata.streams.get_mut(*stream_id)
                 && let Some(topic) = stream.topics.get_mut(*topic_id)
                 && let Some(group) = topic.consumer_groups.get_mut(*group_id)
             {
+                // Pre-validated by maybe_complete_pending_revocation before 
dispatch.
                 group.complete_revocation(*member_slab_id, *member_id, 
*partition_id);
             }
         }
diff --git a/core/server/src/metadata/consumer_group.rs 
b/core/server/src/metadata/consumer_group.rs
index d26734fc1..c29a82e37 100644
--- a/core/server/src/metadata/consumer_group.rs
+++ b/core/server/src/metadata/consumer_group.rs
@@ -26,6 +26,7 @@ use slab::Slab;
 use std::collections::HashSet;
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
+use tracing::warn;
 
 #[derive(Clone, Debug)]
 pub struct ConsumerGroupMeta {
@@ -66,8 +67,9 @@ impl ConsumerGroupMeta {
         }
     }
 
-    /// Incremental partition assignment for a new member join.
-    pub fn assign_partitions_to_new_members(&mut self) {
+    /// Cooperative rebalance: assign unassigned partitions to idle members and
+    /// mark excess partitions on over-assigned members as pending revocation.
+    pub fn rebalance_cooperative(&mut self) {
         let member_count = self.members.len();
         if member_count == 0 || self.partitions.is_empty() {
             return;
@@ -200,10 +202,16 @@ impl ConsumerGroupMeta {
                     break;
                 }
                 let idle_id = idle_slab_ids.remove(0);
+                let target_member_id = self
+                    .members
+                    .get(idle_id)
+                    .map(|m| m.id)
+                    .unwrap_or(usize::MAX);
                 if let Some(member) = self.members.get_mut(mid) {
                     member.pending_revocations.push(PendingRevocation {
                         partition_id,
                         target_slab_id: idle_id,
+                        target_member_id,
                         created_at_micros: IggyTimestamp::now().as_micros(),
                     });
                 }
@@ -226,7 +234,7 @@ impl ConsumerGroupMeta {
                 };
                 let last_polled = {
                     let guard = partition.last_polled_offsets.pin();
-                    guard.get(&cg_id).map(|v| v.load(Ordering::Relaxed))
+                    guard.get(&cg_id).map(|v| v.load(Ordering::Acquire))
                 };
                 let can_complete = match last_polled {
                     None => true,
@@ -234,7 +242,7 @@ impl ConsumerGroupMeta {
                         let offsets_guard = 
partition.consumer_group_offsets.pin();
                         offsets_guard
                             .get(&cg_id)
-                            .map(|offset| 
offset.offset.load(Ordering::Relaxed))
+                            .map(|offset| 
offset.offset.load(Ordering::Acquire))
                             .is_some_and(|committed| committed >= polled)
                     }
                 };
@@ -257,8 +265,12 @@ impl ConsumerGroupMeta {
         member_id: usize,
         partition_id: PartitionId,
     ) -> bool {
-        let target_slab_id = if let Some(member) = 
self.members.get_mut(member_slab_id) {
+        let target_info = if let Some(member) = 
self.members.get_mut(member_slab_id) {
             if member.id != member_id {
+                warn!(
+                    "Revocation rejected: member ID mismatch 
(slab={member_slab_id}, expected={member_id}, actual={})",
+                    member.id
+                );
                 return false;
             }
             let pos = member
@@ -267,21 +279,33 @@ impl ConsumerGroupMeta {
                 .position(|revocation| revocation.partition_id == 
partition_id);
             if let Some(pos) = pos {
                 let removed = member.pending_revocations.remove(pos);
-                let target = removed.target_slab_id;
                 member.partitions.retain(|&p| p != partition_id);
-                Some(target)
+                Some((removed.target_slab_id, removed.target_member_id))
             } else {
+                warn!(
+                    "Revocation rejected: no pending revocation for 
partition={partition_id} on slab={member_slab_id}"
+                );
                 None
             }
         } else {
+            warn!("Revocation rejected: source slab={member_slab_id} not 
found");
             None
         };
 
-        if let Some(target) = target_slab_id {
-            if let Some(target_member) = self.members.get_mut(target) {
+        if let Some((target_slab, expected_target_id)) = target_info {
+            if let Some(target_member) = self.members.get_mut(target_slab) {
+                if target_member.id != expected_target_id {
+                    warn!(
+                        "Revocation target slab={target_slab} reused 
(expected={expected_target_id}, actual={}), full rebalance",
+                        target_member.id
+                    );
+                    self.rebalance_members();
+                    return true;
+                }
                 target_member.partitions.push(partition_id);
                 return true;
             }
+            warn!("Revocation target slab={target_slab} gone, full rebalance");
             self.rebalance_members();
             return true;
         }
diff --git a/core/server/src/metadata/consumer_group_member.rs 
b/core/server/src/metadata/consumer_group_member.rs
index d7c7486c1..7605de0cc 100644
--- a/core/server/src/metadata/consumer_group_member.rs
+++ b/core/server/src/metadata/consumer_group_member.rs
@@ -24,6 +24,7 @@ use std::sync::atomic::AtomicUsize;
 pub struct PendingRevocation {
     pub partition_id: PartitionId,
     pub target_slab_id: usize,
+    pub target_member_id: usize,
     pub created_at_micros: u64,
 }
 
diff --git a/core/server/src/metadata/inner.rs 
b/core/server/src/metadata/inner.rs
index 36195fb5f..4bcc165f5 100644
--- a/core/server/src/metadata/inner.rs
+++ b/core/server/src/metadata/inner.rs
@@ -20,7 +20,6 @@ use ahash::{AHashMap, AHashSet};
 use iggy_common::{GlobalPermissions, PersonalAccessToken, StreamPermissions};
 use slab::Slab;
 use std::sync::Arc;
-use std::sync::atomic::AtomicUsize;
 
 #[derive(Clone, Default)]
 pub struct InnerMetadata {
@@ -46,9 +45,6 @@ pub struct InnerMetadata {
     pub users_can_send_all_streams: AHashSet<UserId>,
     pub users_can_poll_stream: AHashSet<(UserId, StreamId)>,
     pub users_can_send_stream: AHashSet<(UserId, StreamId)>,
-
-    /// Global pending revocation count, shared across left-right copies.
-    pub pending_revocations_count: Arc<AtomicUsize>,
 }
 
 impl InnerMetadata {
diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs
index e9e38148d..a6d80c237 100644
--- a/core/server/src/metadata/ops.rs
+++ b/core/server/src/metadata/ops.rs
@@ -129,5 +129,6 @@ pub enum MetadataOp {
         member_slab_id: usize,
         member_id: usize,
         partition_id: PartitionId,
+        timed_out: bool,
     },
 }
diff --git a/core/server/src/metadata/reader.rs 
b/core/server/src/metadata/reader.rs
index dfcd0c0ef..ba60e177f 100644
--- a/core/server/src/metadata/reader.rs
+++ b/core/server/src/metadata/reader.rs
@@ -253,83 +253,6 @@ impl Metadata {
         Some(current % partitions_count)
     }
 
-    pub fn get_next_member_partition_id(
-        &self,
-        stream_id: StreamId,
-        topic_id: TopicId,
-        group_id: ConsumerGroupId,
-        member_id: usize,
-        calculate: bool,
-    ) -> Option<PartitionId> {
-        let metadata = self.load();
-        let member = metadata
-            .streams
-            .get(stream_id)?
-            .topics
-            .get(topic_id)?
-            .consumer_groups
-            .get(group_id)?
-            .members
-            .get(member_id)?;
-
-        // Fast path: no pending revocations
-        if member.pending_revocations.is_empty() {
-            let partitions = &member.partitions;
-            let count = partitions.len();
-            if count == 0 {
-                return None;
-            }
-            let counter = &member.partition_index;
-            return if calculate {
-                let current = counter
-                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
-                        Some((c + 1) % count)
-                    })
-                    .unwrap();
-                Some(partitions[current % count])
-            } else {
-                let current = counter.load(Ordering::Relaxed);
-                Some(partitions[current % count])
-            };
-        }
-
-        // Slow path: skip revoked partitions via linear scan
-        let effective_count = member.partitions.len() - 
member.pending_revocations.len();
-        if effective_count == 0 {
-            return None;
-        }
-
-        let counter = &member.partition_index;
-        let idx = if calculate {
-            counter
-                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| {
-                    Some((c + 1) % effective_count)
-                })
-                .unwrap()
-                % effective_count
-        } else {
-            counter.load(Ordering::Relaxed) % effective_count
-        };
-
-        // Find the idx-th non-revoked partition
-        let mut seen = 0;
-        for &pid in &member.partitions {
-            let is_revoked = member
-                .pending_revocations
-                .iter()
-                .any(|revocation| revocation.partition_id == pid);
-            if is_revoked {
-                continue;
-            }
-            if seen == idx {
-                return Some(pid);
-            }
-            seen += 1;
-        }
-
-        None
-    }
-
     /// Resolve consumer group partition under a single metadata read guard.
     pub fn resolve_consumer_group_partition(
         &self,
@@ -523,7 +446,7 @@ impl Metadata {
             let guard = partition.last_polled_offsets.pin();
             match guard.get(&key) {
                 Some(existing) => {
-                    existing.store(offset, Ordering::Relaxed);
+                    existing.store(offset, Ordering::Release);
                 }
                 None => {
                     guard.insert(key, 
Arc::new(std::sync::atomic::AtomicU64::new(offset)));
@@ -532,30 +455,6 @@ impl Metadata {
         }
     }
 
-    /// Get the last offset polled by a consumer group for a partition.
-    pub fn get_last_polled_offset(
-        &self,
-        stream_id: StreamId,
-        topic_id: TopicId,
-        group_id: ConsumerGroupId,
-        partition_id: PartitionId,
-    ) -> Option<u64> {
-        use crate::streaming::polling_consumer::ConsumerGroupId as CgIdNewtype;
-
-        let metadata = self.load();
-        metadata
-            .streams
-            .get(stream_id)
-            .and_then(|s| s.topics.get(topic_id))
-            .and_then(|t| t.partitions.get(partition_id))
-            .and_then(|p| {
-                let guard = p.last_polled_offsets.pin();
-                guard
-                    .get(&CgIdNewtype(group_id))
-                    .map(|v| v.load(Ordering::Relaxed))
-            })
-    }
-
     pub fn users_count(&self) -> usize {
         self.load().users.len()
     }
diff --git a/core/server/src/metadata/writer.rs 
b/core/server/src/metadata/writer.rs
index 3afa38d5f..4712679c8 100644
--- a/core/server/src/metadata/writer.rs
+++ b/core/server/src/metadata/writer.rs
@@ -302,6 +302,7 @@ impl MetadataWriter {
         self.publish();
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub fn complete_partition_revocation(
         &mut self,
         stream_id: StreamId,
@@ -310,6 +311,7 @@ impl MetadataWriter {
         member_slab_id: usize,
         member_id: usize,
         partition_id: PartitionId,
+        timed_out: bool,
     ) {
         self.append(MetadataOp::CompletePartitionRevocation {
             stream_id,
@@ -318,6 +320,7 @@ impl MetadataWriter {
             member_slab_id,
             member_id,
             partition_id,
+            timed_out,
         });
         self.publish();
     }
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index 7cbbb8368..505eb50f1 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -359,6 +359,7 @@ async fn handle_request(
             member_slab_id,
             member_id,
             partition_id,
+            timed_out,
         } => {
             assert_eq!(
                 shard.id, 0,
@@ -372,6 +373,7 @@ async fn handle_request(
                 member_slab_id,
                 member_id,
                 partition_id,
+                timed_out,
             );
 
             Ok(ShardResponse::CompletePartitionRevocationResponse)
diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index 505bcd776..327ba2b14 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -117,6 +117,7 @@ impl IggyShard {
                 revocation.slab_id,
                 revocation.member_id,
                 revocation.partition_id,
+                false,
             );
         }
 
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index e42cffc94..d03d7971b 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -298,7 +298,7 @@ impl IggyShard {
                     let path = format!("{}/{}", dir_path, id);
                     ConsumerOffset::new(ConsumerKind::Consumer, *id as u32, 
offset, path)
                 });
-                entry.offset.store(offset, Ordering::Relaxed);
+                entry.offset.store(offset, Ordering::Release);
             }
             PollingConsumer::ConsumerGroup(cg_id, _) => {
                 let Some(offsets) = 
self.metadata.get_partition_consumer_group_offsets(
@@ -319,7 +319,7 @@ impl IggyShard {
                     let path = format!("{}/{}", dir_path, cg_id.0);
                     ConsumerOffset::new(ConsumerKind::ConsumerGroup, cg_id.0 
as u32, offset, path)
                 });
-                entry.offset.store(offset, Ordering::Relaxed);
+                entry.offset.store(offset, Ordering::Release);
             }
         }
     }
@@ -449,7 +449,7 @@ impl IggyShard {
             let partition = topic.partitions.get(partition_id)?;
             let last_polled = {
                 let guard = partition.last_polled_offsets.pin();
-                guard.get(group_id).map(|v| v.load(Ordering::Relaxed))
+                guard.get(group_id).map(|v| v.load(Ordering::Acquire))
             };
             let can_complete = match last_polled {
                 None => true,
@@ -457,7 +457,7 @@ impl IggyShard {
                     let guard = partition.consumer_group_offsets.pin();
                     guard
                         .get(group_id)
-                        .map(|co| co.offset.load(Ordering::Relaxed))
+                        .map(|co| co.offset.load(Ordering::Acquire))
                         .is_some_and(|c| c >= polled)
                 }
             };
@@ -473,6 +473,7 @@ impl IggyShard {
                     member_slab_id: member_id.0,
                     member_id: logical_member_id,
                     partition_id,
+                    timed_out: false,
                 });
             let _ = self.send_to_control_plane(request).await;
         }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 68daaf741..d8ce2d891 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -265,7 +265,7 @@ impl IggyShard {
                             ),
                         ),
                     );
-                    item.offset.store(offset, Ordering::Relaxed);
+                    item.offset.store(offset, Ordering::Release);
                     (item.offset.load(Ordering::Relaxed), item.path.clone())
                 }
                 PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
@@ -287,7 +287,7 @@ impl IggyShard {
                             ),
                         ),
                     );
-                    item.offset.store(offset, Ordering::Relaxed);
+                    item.offset.store(offset, Ordering::Release);
                     (item.offset.load(Ordering::Relaxed), item.path.clone())
                 }
             }
diff --git a/core/server/src/shard/tasks/periodic/revocation_timeout.rs 
b/core/server/src/shard/tasks/periodic/revocation_timeout.rs
index d9b5c7abd..21dc68cae 100644
--- a/core/server/src/shard/tasks/periodic/revocation_timeout.rs
+++ b/core/server/src/shard/tasks/periodic/revocation_timeout.rs
@@ -96,6 +96,7 @@ async fn check_revocation_timeouts(shard: Rc<IggyShard>) -> 
Result<(), IggyError
                 member_slab_id,
                 member_id,
                 partition_id,
+                timed_out: true,
             });
         let _ = shard.send_to_control_plane(request).await;
     }
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 7fbaddf37..b533eddf1 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -229,6 +229,7 @@ pub enum ShardRequestPayload {
         member_slab_id: usize,
         member_id: usize,
         partition_id: usize,
+        timed_out: bool,
     },
 
     // Control-plane: PAT operations

Reply via email to