This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 28c3f67da fix(server,sdk): handle stale client in consumer group 
operations (#2547)
28c3f67da is described below

commit 28c3f67da5074e7fe070836de8acd6d4b5638444
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 7 16:50:49 2026 +0100

    fix(server,sdk): handle stale client in consumer group operations (#2547)
    
    Server: check client existence before consumer group member lookup,
    return StaleClient instead of ConsumerGroupMemberNotFound
    when client was removed by heartbeat verifier mid-request.
    
    SDK: improve flag synchronization in IggyConsumer for
    reconnection, poll waits for rejoin to complete before sending requests.
    
    Add integration tests for stale client recovery with consumer groups.
    
    Co-authored-by: Piotr Gankiewicz <[email protected]>
---
 Cargo.lock                                         |   4 +-
 DEPENDENCIES.md                                    |   4 +-
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../stale_client_consumer_group_scenario.rs        | 303 +++++++++++++++++++++
 core/sdk/Cargo.toml                                |   2 +-
 core/sdk/src/clients/consumer.rs                   |  19 +-
 core/server/Cargo.toml                             |   2 +-
 core/server/src/quic/listener.rs                   |   6 +-
 core/server/src/shard/system/utils.rs              |   9 +
 .../server/src/streaming/clients/client_manager.rs |   6 +-
 core/server/src/tcp/connection_handler.rs          |   8 +-
 core/server/src/websocket/connection_handler.rs    |   2 +-
 12 files changed, 346 insertions(+), 20 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b37ab1d22..b8e260dc6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4459,7 +4459,7 @@ dependencies = [
 
 [[package]]
 name = "iggy"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
 dependencies = [
  "async-broadcast",
  "async-dropper",
@@ -8246,7 +8246,7 @@ dependencies = [
 
 [[package]]
 name = "server"
-version = "0.6.1-edge.2"
+version = "0.6.1-edge.3"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 88133f67a..8423e83d3 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -391,7 +391,7 @@ icu_provider: 2.1.1, "Unicode-3.0",
 ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.2, "Apache-2.0",
+iggy: 0.8.1-edge.3, "Apache-2.0",
 iggy-bench: 0.3.1-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
 iggy-cli: 0.10.1-edge.1, "Apache-2.0",
@@ -726,7 +726,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
 serde_yaml_ng: 0.10.0, "MIT",
 serial_test: 3.2.0, "MIT",
 serial_test_derive: 3.2.0, "MIT",
-server: 0.6.1-edge.2, "Apache-2.0",
+server: 0.6.1-edge.3, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
 sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index a940c5155..57ac2ddaa 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,6 +29,7 @@ pub mod delete_segments_scenario;
 pub mod encryption_scenario;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
+pub mod stale_client_consumer_group_scenario;
 pub mod stream_size_validation_scenario;
 pub mod system_scenario;
 pub mod tcp_tls_scenario;
diff --git 
a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
 
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
new file mode 100644
index 000000000..7784fb6ed
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
@@ -0,0 +1,303 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::Credentials;
+use integration::test_server::{IpAddrKind, TestServer};
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::time::sleep;
+
+const STREAM_NAME: &str = "stale-test-stream";
+const TOPIC_NAME: &str = "stale-test-topic";
+const CONSUMER_GROUP_NAME: &str = "stale-test-cg";
+const PARTITIONS_COUNT: u32 = 1;
+const TOTAL_MESSAGES: u32 = 10;
+
+fn create_test_server() -> TestServer {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert("IGGY_HEARTBEAT_ENABLED".to_string(), 
"true".to_string());
+    extra_envs.insert("IGGY_HEARTBEAT_INTERVAL".to_string(), "2s".to_string());
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+    TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4)
+}
+
+async fn create_client(server_addr: &str, heartbeat_interval: &str) -> 
IggyClient {
+    let config = TcpClientConfig {
+        server_address: server_addr.to_string(),
+        heartbeat_interval: 
IggyDuration::from_str(heartbeat_interval).unwrap(),
+        nodelay: true,
+        ..TcpClientConfig::default()
+    };
+    let client = TcpClient::create(Arc::new(config)).unwrap();
+    Client::connect(&client).await.unwrap();
+    IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+async fn create_reconnecting_client(server_addr: &str) -> IggyClient {
+    let config = TcpClientConfig {
+        server_address: server_addr.to_string(),
+        heartbeat_interval: IggyDuration::from_str("1h").unwrap(),
+        nodelay: true,
+        auto_login: AutoLogin::Enabled(Credentials::UsernamePassword(
+            DEFAULT_ROOT_USERNAME.to_string(),
+            DEFAULT_ROOT_PASSWORD.to_string(),
+        )),
+        reconnection: TcpClientReconnectionConfig {
+            enabled: true,
+            max_retries: Some(5),
+            interval: IggyDuration::from_str("500ms").unwrap(),
+            reestablish_after: IggyDuration::from_str("100ms").unwrap(),
+        },
+        ..TcpClientConfig::default()
+    };
+    let client = TcpClient::create(Arc::new(config)).unwrap();
+    Client::connect(&client).await.unwrap();
+    IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+async fn setup_resources(client: &IggyClient) {
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+
+    client.create_stream(STREAM_NAME).await.unwrap();
+
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            PARTITIONS_COUNT,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+
+    for i in 0..TOTAL_MESSAGES {
+        let message = IggyMessage::from_str(&format!("message-{i}")).unwrap();
+        let mut messages = vec![message];
+        client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(0),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+}
+
+/// Tests that a stale client receives clean errors and can manually reconnect.
+#[tokio::test]
+#[parallel]
+async fn should_handle_stale_client_with_manual_reconnection() {
+    let mut test_server = create_test_server();
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+    let setup_client = create_client(&server_addr, "500ms").await;
+    setup_resources(&setup_client).await;
+
+    // Client with 1h heartbeat will become stale
+    let stale_client = create_client(&server_addr, "1h").await;
+    stale_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+
+    stale_client
+        .join_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap();
+
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+
+    // Poll first 5 messages
+    let mut messages_polled = 0;
+    while messages_polled < 5 {
+        let polled = stale_client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true,
+            )
+            .await
+            .unwrap();
+        messages_polled += polled.messages.len();
+    }
+    assert_eq!(messages_polled, 5);
+
+    // Wait for heartbeat timeout (2s * 1.2 = 2.4s threshold)
+    sleep(Duration::from_secs(4)).await;
+
+    // Should get error after stale detection
+    let mut got_error = false;
+    for _ in 0..3 {
+        if stale_client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true,
+            )
+            .await
+            .is_err()
+        {
+            got_error = true;
+            break;
+        }
+        sleep(Duration::from_millis(100)).await;
+    }
+    assert!(got_error, "Expected error after heartbeat timeout");
+
+    // Reconnect with new client
+    drop(stale_client);
+    let new_client = create_client(&server_addr, "500ms").await;
+    new_client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
+    new_client
+        .join_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap();
+
+    // Poll remaining messages
+    let mut remaining_polled = 0;
+    let start = std::time::Instant::now();
+    while remaining_polled < 5 && start.elapsed() < Duration::from_secs(5) {
+        match new_client
+            .poll_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                None,
+                &consumer,
+                &PollingStrategy::next(),
+                1,
+                true,
+            )
+            .await
+        {
+            Ok(polled) => remaining_polled += polled.messages.len(),
+            Err(_) => sleep(Duration::from_millis(100)).await,
+        }
+    }
+    assert_eq!(remaining_polled, 5);
+
+    let _ = setup_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await;
+    test_server.stop();
+}
+
+/// Tests that IggyConsumer automatically recovers after stale disconnect.
+#[tokio::test]
+#[parallel]
+async fn should_handle_stale_client_with_auto_reconnection() {
+    let mut test_server = create_test_server();
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+    let setup_client = create_client(&server_addr, "500ms").await;
+    setup_resources(&setup_client).await;
+
+    let consumer_client = create_reconnecting_client(&server_addr).await;
+    // Note: auto_login is enabled in create_reconnecting_client, so no manual 
login needed
+
+    let mut consumer: IggyConsumer = consumer_client
+        .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .batch_length(1)
+        .poll_interval(IggyDuration::from_str("100ms").unwrap())
+        .polling_strategy(PollingStrategy::next())
+        .auto_join_consumer_group()
+        .create_consumer_group_if_not_exists()
+        .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+        .polling_retry_interval(IggyDuration::from_str("500ms").unwrap())
+        .build();
+
+    consumer.init().await.unwrap();
+
+    let mut messages_consumed = 0u32;
+    let start = std::time::Instant::now();
+    let timeout = Duration::from_secs(15);
+
+    while messages_consumed < TOTAL_MESSAGES && start.elapsed() < timeout {
+        let poll_result: Option<Result<ReceivedMessage, IggyError>> =
+            tokio::time::timeout(Duration::from_millis(500), consumer.next())
+                .await
+                .ok()
+                .flatten();
+
+        if let Some(Ok(_)) = poll_result {
+            messages_consumed += 1;
+            if messages_consumed == 5 {
+                // Sleep longer than heartbeat threshold (2s * 1.2 = 2.4s) to 
trigger staleness
+                sleep(Duration::from_millis(4000)).await;
+            }
+        }
+    }
+
+    drop(consumer);
+    let _ = setup_client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await;
+    test_server.stop();
+
+    assert_eq!(
+        messages_consumed, TOTAL_MESSAGES,
+        "Should consume all messages after automatic reconnection"
+    );
+}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 3f65fcaff..21b0f7dfa 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 3c2a23666..3d7784938 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -655,14 +655,22 @@ impl IggyConsumer {
         let last_stored_offset = self.last_stored_offsets.clone();
         let last_consumed_offset = self.last_consumed_offsets.clone();
         let allow_replay = self.allow_replay;
+        let is_consumer_group = self.is_consumer_group;
+        let joined_consumer_group = self.joined_consumer_group.clone();
 
         async move {
             if interval > 0 {
                 Self::wait_before_polling(interval, 
last_polled_at.load(ORDERING)).await;
             }
 
-            if !can_poll.load(ORDERING) {
-                trace!("Trying to poll messages in {retry_interval}...");
+            while !can_poll.load(ORDERING)
+                || (is_consumer_group && !joined_consumer_group.load(ORDERING))
+            {
+                trace!(
+                    "Cannot poll yet (can_poll={}, joined_cg={}), waiting 
{retry_interval}...",
+                    can_poll.load(ORDERING),
+                    joined_consumer_group.load(ORDERING)
+                );
                 sleep(retry_interval.get_duration()).await;
             }
 
@@ -772,10 +780,17 @@ impl IggyConsumer {
 
             let error = polled_messages.unwrap_err();
             error!("Failed to poll messages: {error}");
+
+            // Handle connection/auth errors - disable polling until event 
task re-enables
+            // it after reconnection and rejoin complete
             if matches!(
                 error,
                 IggyError::Disconnected | IggyError::Unauthenticated | 
IggyError::StaleClient
             ) {
+                can_poll.store(false, ORDERING);
+                if is_consumer_group {
+                    joined_consumer_group.store(false, ORDERING);
+                }
                 trace!("Retrying to poll messages in {retry_interval}...");
                 sleep(retry_interval.get_duration()).await;
             }
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 41c7d88b0..cf5fad08c 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "server"
-version = "0.6.1-edge.2"
+version = "0.6.1-edge.3"
 edition = "2024"
 license = "Apache-2.0"
 
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index 9457f8af9..67a2d4937 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -210,12 +210,12 @@ async fn handle_stream(
                     "Command was not handled successfully, session: {:?}, 
error: {e}.",
                     session
                 );
-                // Only return a connection-terminating error for client not 
found
-                if let IggyError::ClientNotFound(_) = e {
+                // Only return a connection-terminating error for client not 
found or stale
+                if matches!(e, IggyError::ClientNotFound(_) | 
IggyError::StaleClient) {
                     sender.send_error_response(e.clone()).await?;
                     trace!("QUIC error response was sent.");
                     error!("Session will be deleted.");
-                    Err(anyhow!("Client not found: {e}"))
+                    Err(anyhow!("Client invalid: {e}"))
                 } else {
                     // For all other errors, send response and continue the 
connection
                     sender.send_error_response(e).await?;
diff --git a/core/server/src/shard/system/utils.rs 
b/core/server/src/shard/system/utils.rs
index 2eb8f55d5..e7ad661b1 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -133,6 +133,11 @@ impl IggyShard {
                 )))
             }
             ConsumerKind::ConsumerGroup => {
+                // Client may have been removed by heartbeat verifier while 
request was in-flight
+                if self.client_manager.try_get_client(client_id).is_none() {
+                    return Err(IggyError::StaleClient);
+                }
+
                 self.ensure_consumer_group_exists(stream_id, topic_id, 
&consumer.id)?;
                 let cg_id = self.streams.with_consumer_group_by_id(
                     stream_id,
@@ -146,6 +151,10 @@ impl IggyShard {
                     &consumer.id,
                     topics::helpers::get_consumer_group_member_id(client_id),
                 ) else {
+                    // Client might have been removed between check above and 
here
+                    if self.client_manager.try_get_client(client_id).is_none() 
{
+                        return Err(IggyError::StaleClient);
+                    }
                     return Err(IggyError::ConsumerGroupMemberNotFound(
                         client_id,
                         consumer.id.clone(),
diff --git a/core/server/src/streaming/clients/client_manager.rs 
b/core/server/src/streaming/clients/client_manager.rs
index 349980b55..3378fbee1 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -134,7 +134,7 @@ impl ClientManager {
         let mut client = self
             .clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?;
+            .ok_or(IggyError::StaleClient)?;
         client.last_heartbeat = IggyTimestamp::now();
         Ok(())
     }
@@ -153,7 +153,7 @@ impl ClientManager {
         let mut client = self
             .clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?;
+            .ok_or(IggyError::StaleClient)?;
 
         if client.consumer_groups.iter().any(|consumer_group| {
             consumer_group.group_id == group_id
@@ -185,7 +185,7 @@ impl ClientManager {
         let mut client = self
             .clients
             .get_mut(&client_id)
-            .ok_or(IggyError::ClientNotFound(client_id))?;
+            .ok_or(IggyError::StaleClient)?;
 
         if let Some(index) = 
client.consumer_groups.iter().position(|consumer_group| {
             consumer_group.stream_id == stream_id
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 3e19038c8..d3a58db57 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -121,13 +121,11 @@ pub(crate) async fn handle_connection(
                         "Command with code {cmd_code} was not handled 
successfully, session: {session}, error: {error}."
                     );
 
-                    if let IggyError::ClientNotFound(_) = error {
-                        sender.send_error_response(error).await?;
+                    if matches!(error, IggyError::ClientNotFound(_) | 
IggyError::StaleClient) {
+                        sender.send_error_response(error.clone()).await?;
                         debug!("TCP error response was sent to: {session}.");
                         error!("Session: {session} will be deleted.");
-                        return 
Err(ConnectionError::from(IggyError::ClientNotFound(
-                            session.client_id,
-                        )));
+                        return Err(ConnectionError::from(error));
                     } else {
                         sender.send_error_response(error).await?;
                         debug!("TCP error response was sent to: {session}.");
diff --git a/core/server/src/websocket/connection_handler.rs 
b/core/server/src/websocket/connection_handler.rs
index 8209d502f..2ee97196f 100644
--- a/core/server/src/websocket/connection_handler.rs
+++ b/core/server/src/websocket/connection_handler.rs
@@ -96,7 +96,7 @@ pub(crate) async fn handle_connection(
                         );
                         return 
Err(ConnectionError::from(IggyError::ConnectionClosed));
                     }
-                    IggyError::ClientNotFound(_) => {
+                    IggyError::ClientNotFound(_) | IggyError::StaleClient => {
                         error!("Command failed for session: {session}, error: 
{error}.");
                         sender.send_error_response(error.clone()).await?;
                         return Err(ConnectionError::from(error));

Reply via email to