This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch stale-client-fix
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fdc5f919f5fe5b1eeb33ae1daba125b92fa13490
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 7 15:32:53 2026 +0100

    fix(server,sdk): handle stale client in consumer group operations
    
    Server: check client existence before consumer group member lookup,
    return StaleClient instead of ConsumerGroupMemberNotFound when client
    was removed by heartbeat verifier mid-request.
    
    SDK: improve flag synchronization in IggyConsumer for reconnection,
    poll waits for rejoin to complete before sending requests.
    
    Add integration tests for stale client recovery with consumer groups.
---
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../stale_client_consumer_group_scenario.rs        | 306 +++++++++++++++++++++
 core/sdk/src/clients/consumer.rs                   |  19 +-
 .../src/binary/handlers/system/get_me_handler.rs   |   2 +-
 core/server/src/shard/system/utils.rs              |   9 +
 .../server/src/streaming/clients/client_manager.rs |  10 +-
 6 files changed, 339 insertions(+), 8 deletions(-)

diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index a940c5155..57ac2ddaa 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,6 +29,7 @@ pub mod delete_segments_scenario;
 pub mod encryption_scenario;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
+pub mod stale_client_consumer_group_scenario;
 pub mod stream_size_validation_scenario;
 pub mod system_scenario;
 pub mod tcp_tls_scenario;
diff --git 
a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
 
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
new file mode 100644
index 000000000..27e199e39
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
@@ -0,0 +1,306 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::Credentials;
+use integration::test_server::{IpAddrKind, TestServer};
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::time::sleep;
+
+const STREAM_NAME: &str = "stale-test-stream";
+const TOPIC_NAME: &str = "stale-test-topic";
+const CONSUMER_GROUP_NAME: &str = "stale-test-cg";
+const PARTITIONS_COUNT: u32 = 1;
+const TOTAL_MESSAGES: u32 = 10;
+
+fn create_test_server() -> TestServer {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert("IGGY_HEARTBEAT_ENABLED".to_string(), 
"true".to_string());
+    extra_envs.insert("IGGY_HEARTBEAT_INTERVAL".to_string(), "2s".to_string());
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+    TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4)
+}
+
+async fn create_client(server_addr: &str, heartbeat_interval: &str) -> 
IggyClient {
+    let config = TcpClientConfig {
+        server_address: server_addr.to_string(),
+        heartbeat_interval: 
IggyDuration::from_str(heartbeat_interval).unwrap(),
+        nodelay: true,
+        ..TcpClientConfig::default()
+    };
+    let client = TcpClient::create(Arc::new(config)).unwrap();
+    Client::connect(&client).await.unwrap();
+    IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+async fn create_reconnecting_client(server_addr: &str) -> IggyClient {
+    let config = TcpClientConfig {
+        server_address: server_addr.to_string(),
+        heartbeat_interval: IggyDuration::from_str("1h").unwrap(),
+        nodelay: true,
+        auto_login: AutoLogin::Enabled(Credentials::UsernamePassword(
+            DEFAULT_ROOT_USERNAME.to_string(),
+            DEFAULT_ROOT_PASSWORD.to_string(),
+        )),
+        reconnection: TcpClientReconnectionConfig {
+            enabled: true,
+            max_retries: Some(5),
+            interval: IggyDuration::from_str("500ms").unwrap(),
+            reestablish_after: IggyDuration::from_str("100ms").unwrap(),
+        },
+        ..TcpClientConfig::default()
+    };
+    let client = TcpClient::create(Arc::new(config)).unwrap();
+    Client::connect(&client).await.unwrap();
+    IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+async fn setup_resources(client: &IggyClient) {
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+
+    client.create_stream(STREAM_NAME).await.unwrap();
+
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    for i in 0..TOTAL_MESSAGES {
+        let message = IggyMessage::from_str(&format!("message-{i}")).unwrap();
+        let mut messages = vec![message];
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(0),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+}
+
+/// Tests that a stale client receives clean errors and can manually reconnect.
+#[tokio::test]
+#[parallel]
+async fn should_handle_stale_client_with_manual_reconnection() {
+    let mut test_server = create_test_server();
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+    let setup_client = create_client(&server_addr, "500ms").await;
+    setup_resources(&setup_client).await;
+
+    // Client with 1h heartbeat will become stale
+    let stale_client = create_client(&server_addr, "1h").await;
+    stale_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+
+    stale_client
+        .join_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap();
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+
+    // Poll first 5 messages
+    let mut messages_polled = 0;
+    while messages_polled < 5 {
+        let polled = stale_client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true,
+            )
+            .await
+            .unwrap();
+        messages_polled += polled.messages.len();
+    }
+    assert_eq!(messages_polled, 5);
+
+    // Wait for heartbeat timeout (2s * 1.2 = 2.4s threshold)
+    sleep(Duration::from_secs(4)).await;
+
+    // Should get error after stale detection
+    let mut got_error = false;
+    for _ in 0..3 {
+        if stale_client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true,
+            )
+            .await
+            .is_err()
+        {
+            got_error = true;
+            break;
+        }
+        sleep(Duration::from_millis(100)).await;
+    }
+    assert!(got_error, "Expected error after heartbeat timeout");
+
+    // Reconnect with new client
+    drop(stale_client);
+    let new_client = create_client(&server_addr, "500ms").await;
+    new_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    new_client
+        .join_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap();
+
+    // Poll remaining messages
+    let mut remaining_polled = 0;
+    let start = std::time::Instant::now();
+    while remaining_polled < 5 && start.elapsed() < Duration::from_secs(5) {
+        match new_client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true,
+            )
+            .await
+        {
+            Ok(polled) => remaining_polled += polled.messages.len(),
+            Err(_) => sleep(Duration::from_millis(100)).await,
+        }
+    }
+    assert_eq!(remaining_polled, 5);
+
+    let _ = setup_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await;
+    test_server.stop();
+}
+
+/// Tests that IggyConsumer automatically recovers after stale disconnect.
+#[tokio::test]
+#[parallel]
+async fn should_handle_stale_client_with_auto_reconnection() {
+    let mut test_server = create_test_server();
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+    let setup_client = create_client(&server_addr, "500ms").await;
+    setup_resources(&setup_client).await;
+
+    let consumer_client = create_reconnecting_client(&server_addr).await;
+    consumer_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+
+    let mut consumer: IggyConsumer = consumer_client
+        .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .batch_length(1)
+        .poll_interval(IggyDuration::from_str("100ms").unwrap())
+        .polling_strategy(PollingStrategy::next())
+        .auto_join_consumer_group()
+        .create_consumer_group_if_not_exists()
+        .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+        .polling_retry_interval(IggyDuration::from_str("500ms").unwrap())
+        .build();
+
+    consumer.init().await.unwrap();
+
+    let mut messages_consumed = 0u32;
+    let start = std::time::Instant::now();
+    let timeout = Duration::from_secs(15);
+
+    while messages_consumed < TOTAL_MESSAGES && start.elapsed() < timeout {
+        let poll_result: Option<Result<ReceivedMessage, IggyError>> =
+            tokio::time::timeout(Duration::from_millis(500), consumer.next())
+                .await
+                .ok()
+                .flatten();
+
+        if let Some(Ok(_)) = poll_result {
+            messages_consumed += 1;
+            if messages_consumed == 5 {
+                // Sleep longer than heartbeat threshold (2s * 1.2 = 2.4s) to 
trigger staleness
+                sleep(Duration::from_millis(4000)).await;
+            }
+        }
+    }
+
+    drop(consumer);
+    let _ = setup_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await;
+    test_server.stop();
+
+    assert_eq!(
+        messages_consumed, TOTAL_MESSAGES,
+        "Should consume all messages after automatic reconnection"
+    );
+}
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 3c2a23666..3d7784938 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -655,14 +655,22 @@ impl IggyConsumer {
         let last_stored_offset = self.last_stored_offsets.clone();
         let last_consumed_offset = self.last_consumed_offsets.clone();
         let allow_replay = self.allow_replay;
+        let is_consumer_group = self.is_consumer_group;
+        let joined_consumer_group = self.joined_consumer_group.clone();
 
         async move {
             if interval > 0 {
                 Self::wait_before_polling(interval, 
last_polled_at.load(ORDERING)).await;
             }
 
-            if !can_poll.load(ORDERING) {
-                trace!("Trying to poll messages in {retry_interval}...");
+            while !can_poll.load(ORDERING)
+                || (is_consumer_group && !joined_consumer_group.load(ORDERING))
+            {
+                trace!(
+                    "Cannot poll yet (can_poll={}, joined_cg={}), waiting 
{retry_interval}...",
+                    can_poll.load(ORDERING),
+                    joined_consumer_group.load(ORDERING)
+                );
                 sleep(retry_interval.get_duration()).await;
             }
 
@@ -772,10 +780,17 @@ impl IggyConsumer {
 
             let error = polled_messages.unwrap_err();
             error!("Failed to poll messages: {error}");
+
+            // Handle connection/auth errors - disable polling until event 
task re-enables
+            // it after reconnection and rejoin complete
             if matches!(
                 error,
                 IggyError::Disconnected | IggyError::Unauthenticated | 
IggyError::StaleClient
             ) {
+                can_poll.store(false, ORDERING);
+                if is_consumer_group {
+                    joined_consumer_group.store(false, ORDERING);
+                }
                 trace!("Retrying to poll messages in {retry_interval}...");
                 sleep(retry_interval.get_duration()).await;
             }
diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs 
b/core/server/src/binary/handlers/system/get_me_handler.rs
index 339063ab3..a80eac407 100644
--- a/core/server/src/binary/handlers/system/get_me_handler.rs
+++ b/core/server/src/binary/handlers/system/get_me_handler.rs
@@ -50,7 +50,7 @@ impl ServerCommandHandler for GetMe {
             },
         )?
         else {
-            return Err(IggyError::ClientNotFound(session.client_id));
+            return Err(IggyError::StaleClient);
         };
 
         let bytes = mapper::map_client(&client);
diff --git a/core/server/src/shard/system/utils.rs 
b/core/server/src/shard/system/utils.rs
index 2eb8f55d5..e7ad661b1 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -133,6 +133,11 @@ impl IggyShard {
                 )))
             }
             ConsumerKind::ConsumerGroup => {
+                // Client may have been removed by heartbeat verifier while 
request was in-flight
+                if self.client_manager.try_get_client(client_id).is_none() {
+                    return Err(IggyError::StaleClient);
+                }
+
                 self.ensure_consumer_group_exists(stream_id, topic_id, 
&consumer.id)?;
                 let cg_id = self.streams.with_consumer_group_by_id(
                     stream_id,
@@ -146,6 +151,10 @@ impl IggyShard {
                     &consumer.id,
                     topics::helpers::get_consumer_group_member_id(client_id),
                 ) else {
+                    // Client might have been removed between check above and 
here
+                    if self.client_manager.try_get_client(client_id).is_none() 
{
+                        return Err(IggyError::StaleClient);
+                    }
                     return Err(IggyError::ConsumerGroupMemberNotFound(
                         client_id,
                         consumer.id.clone(),
diff --git a/core/server/src/streaming/clients/client_manager.rs 
b/core/server/src/streaming/clients/client_manager.rs
index 349980b55..36461cc4b 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -77,7 +77,7 @@ impl ClientManager {
     pub fn set_user_id(&self, client_id: u32, user_id: UserId) -> Result<(), 
IggyError> {
         self.clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?
+            .ok_or(IggyError::StaleClient)?
             .user_id = Some(user_id);
         Ok(())
     }
@@ -85,7 +85,7 @@ impl ClientManager {
     pub fn clear_user_id(&self, client_id: u32) -> Result<(), IggyError> {
         self.clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?
+            .ok_or(IggyError::StaleClient)?
             .user_id = None;
         Ok(())
     }
@@ -134,7 +134,7 @@ impl ClientManager {
         let mut client = self
             .clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?;
+            .ok_or(IggyError::StaleClient)?;
         client.last_heartbeat = IggyTimestamp::now();
         Ok(())
     }
@@ -153,7 +153,7 @@ impl ClientManager {
         let mut client = self
             .clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?;
+            .ok_or(IggyError::StaleClient)?;
 
         if client.consumer_groups.iter().any(|consumer_group| {
             consumer_group.group_id == group_id
@@ -185,7 +185,7 @@ impl ClientManager {
         let mut client = self
             .clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?;
+            .ok_or(IggyError::StaleClient)?;
 
         if let Some(index) = 
client.consumer_groups.iter().position(|consumer_group| {
             consumer_group.stream_id == stream_id

Reply via email to