This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 28c3f67da fix(server,sdk): handle stale client in consumer group
operations (#2547)
28c3f67da is described below
commit 28c3f67da5074e7fe070836de8acd6d4b5638444
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 7 16:50:49 2026 +0100
fix(server,sdk): handle stale client in consumer group operations (#2547)
Server: check client existence before consumer group member lookup,
return StaleClient instead of ConsumerGroupMemberNotFound
when client was removed by heartbeat verifier mid-request.
SDK: improve flag synchronization in IggyConsumer for
reconnection, poll waits for rejoin to complete before sending requests.
Add integration tests for stale client recovery with consumer groups.
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
Cargo.lock | 4 +-
DEPENDENCIES.md | 4 +-
core/integration/tests/server/scenarios/mod.rs | 1 +
.../stale_client_consumer_group_scenario.rs | 303 +++++++++++++++++++++
core/sdk/Cargo.toml | 2 +-
core/sdk/src/clients/consumer.rs | 19 +-
core/server/Cargo.toml | 2 +-
core/server/src/quic/listener.rs | 6 +-
core/server/src/shard/system/utils.rs | 9 +
.../server/src/streaming/clients/client_manager.rs | 6 +-
core/server/src/tcp/connection_handler.rs | 8 +-
core/server/src/websocket/connection_handler.rs | 2 +-
12 files changed, 346 insertions(+), 20 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index b37ab1d22..b8e260dc6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4459,7 +4459,7 @@ dependencies = [
[[package]]
name = "iggy"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
dependencies = [
"async-broadcast",
"async-dropper",
@@ -8246,7 +8246,7 @@ dependencies = [
[[package]]
name = "server"
-version = "0.6.1-edge.2"
+version = "0.6.1-edge.3"
dependencies = [
"ahash 0.8.12",
"anyhow",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 88133f67a..8423e83d3 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -391,7 +391,7 @@ icu_provider: 2.1.1, "Unicode-3.0",
ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.2, "Apache-2.0",
+iggy: 0.8.1-edge.3, "Apache-2.0",
iggy-bench: 0.3.1-edge.1, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.1-edge.1, "Apache-2.0",
@@ -726,7 +726,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
serde_yaml_ng: 0.10.0, "MIT",
serial_test: 3.2.0, "MIT",
serial_test_derive: 3.2.0, "MIT",
-server: 0.6.1-edge.2, "Apache-2.0",
+server: 0.6.1-edge.3, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index a940c5155..57ac2ddaa 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,6 +29,7 @@ pub mod delete_segments_scenario;
pub mod encryption_scenario;
pub mod message_headers_scenario;
pub mod message_size_scenario;
+pub mod stale_client_consumer_group_scenario;
pub mod stream_size_validation_scenario;
pub mod system_scenario;
pub mod tcp_tls_scenario;
diff --git
a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
new file mode 100644
index 000000000..7784fb6ed
--- /dev/null
+++
b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs
@@ -0,0 +1,303 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::Credentials;
+use integration::test_server::{IpAddrKind, TestServer};
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::time::sleep;
+
+const STREAM_NAME: &str = "stale-test-stream";
+const TOPIC_NAME: &str = "stale-test-topic";
+const CONSUMER_GROUP_NAME: &str = "stale-test-cg";
+const PARTITIONS_COUNT: u32 = 1;
+const TOTAL_MESSAGES: u32 = 10;
+
+fn create_test_server() -> TestServer {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert("IGGY_HEARTBEAT_ENABLED".to_string(),
"true".to_string());
+ extra_envs.insert("IGGY_HEARTBEAT_INTERVAL".to_string(), "2s".to_string());
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
+ TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4)
+}
+
+async fn create_client(server_addr: &str, heartbeat_interval: &str) ->
IggyClient {
+ let config = TcpClientConfig {
+ server_address: server_addr.to_string(),
+ heartbeat_interval:
IggyDuration::from_str(heartbeat_interval).unwrap(),
+ nodelay: true,
+ ..TcpClientConfig::default()
+ };
+ let client = TcpClient::create(Arc::new(config)).unwrap();
+ Client::connect(&client).await.unwrap();
+ IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+async fn create_reconnecting_client(server_addr: &str) -> IggyClient {
+ let config = TcpClientConfig {
+ server_address: server_addr.to_string(),
+ heartbeat_interval: IggyDuration::from_str("1h").unwrap(),
+ nodelay: true,
+ auto_login: AutoLogin::Enabled(Credentials::UsernamePassword(
+ DEFAULT_ROOT_USERNAME.to_string(),
+ DEFAULT_ROOT_PASSWORD.to_string(),
+ )),
+ reconnection: TcpClientReconnectionConfig {
+ enabled: true,
+ max_retries: Some(5),
+ interval: IggyDuration::from_str("500ms").unwrap(),
+ reestablish_after: IggyDuration::from_str("100ms").unwrap(),
+ },
+ ..TcpClientConfig::default()
+ };
+ let client = TcpClient::create(Arc::new(config)).unwrap();
+ Client::connect(&client).await.unwrap();
+ IggyClient::create(ClientWrapper::Tcp(client), None, None)
+}
+
+async fn setup_resources(client: &IggyClient) {
+ client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await
+ .unwrap();
+
+ client.create_stream(STREAM_NAME).await.unwrap();
+
+ client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ PARTITIONS_COUNT,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ client
+ .create_consumer_group(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ CONSUMER_GROUP_NAME,
+ )
+ .await
+ .unwrap();
+
+ for i in 0..TOTAL_MESSAGES {
+ let message = IggyMessage::from_str(&format!("message-{i}")).unwrap();
+ let mut messages = vec![message];
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+ }
+}
+
+/// Tests that a stale client receives clean errors and can manually reconnect.
+#[tokio::test]
+#[parallel]
+async fn should_handle_stale_client_with_manual_reconnection() {
+ let mut test_server = create_test_server();
+ test_server.start();
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+ let setup_client = create_client(&server_addr, "500ms").await;
+ setup_resources(&setup_client).await;
+
+ // Client with 1h heartbeat will become stale
+ let stale_client = create_client(&server_addr, "1h").await;
+ stale_client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await
+ .unwrap();
+
+ stale_client
+ .join_consumer_group(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let consumer =
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+
+ // Poll first 5 messages
+ let mut messages_polled = 0;
+ while messages_polled < 5 {
+ let polled = stale_client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ None,
+ &consumer,
+ &PollingStrategy::next(),
+ 1,
+ true,
+ )
+ .await
+ .unwrap();
+ messages_polled += polled.messages.len();
+ }
+ assert_eq!(messages_polled, 5);
+
+ // Wait for heartbeat timeout (2s * 1.2 = 2.4s threshold)
+ sleep(Duration::from_secs(4)).await;
+
+ // Should get error after stale detection
+ let mut got_error = false;
+ for _ in 0..3 {
+ if stale_client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ None,
+ &consumer,
+ &PollingStrategy::next(),
+ 1,
+ true,
+ )
+ .await
+ .is_err()
+ {
+ got_error = true;
+ break;
+ }
+ sleep(Duration::from_millis(100)).await;
+ }
+ assert!(got_error, "Expected error after heartbeat timeout");
+
+ // Reconnect with new client
+ drop(stale_client);
+ let new_client = create_client(&server_addr, "500ms").await;
+ new_client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await
+ .unwrap();
+ new_client
+ .join_consumer_group(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+ )
+ .await
+ .unwrap();
+
+ // Poll remaining messages
+ let mut remaining_polled = 0;
+ let start = std::time::Instant::now();
+ while remaining_polled < 5 && start.elapsed() < Duration::from_secs(5) {
+ match new_client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ None,
+ &consumer,
+ &PollingStrategy::next(),
+ 1,
+ true,
+ )
+ .await
+ {
+ Ok(polled) => remaining_polled += polled.messages.len(),
+ Err(_) => sleep(Duration::from_millis(100)).await,
+ }
+ }
+ assert_eq!(remaining_polled, 5);
+
+ let _ = setup_client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await;
+ test_server.stop();
+}
+
+/// Tests that IggyConsumer automatically recovers after stale disconnect.
+#[tokio::test]
+#[parallel]
+async fn should_handle_stale_client_with_auto_reconnection() {
+ let mut test_server = create_test_server();
+ test_server.start();
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+ let setup_client = create_client(&server_addr, "500ms").await;
+ setup_resources(&setup_client).await;
+
+ let consumer_client = create_reconnecting_client(&server_addr).await;
+ // Note: auto_login is enabled in create_reconnecting_client, so no manual
login needed
+
+ let mut consumer: IggyConsumer = consumer_client
+ .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
+ .unwrap()
+ .batch_length(1)
+ .poll_interval(IggyDuration::from_str("100ms").unwrap())
+ .polling_strategy(PollingStrategy::next())
+ .auto_join_consumer_group()
+ .create_consumer_group_if_not_exists()
+ .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+ .polling_retry_interval(IggyDuration::from_str("500ms").unwrap())
+ .build();
+
+ consumer.init().await.unwrap();
+
+ let mut messages_consumed = 0u32;
+ let start = std::time::Instant::now();
+ let timeout = Duration::from_secs(15);
+
+ while messages_consumed < TOTAL_MESSAGES && start.elapsed() < timeout {
+ let poll_result: Option<Result<ReceivedMessage, IggyError>> =
+ tokio::time::timeout(Duration::from_millis(500), consumer.next())
+ .await
+ .ok()
+ .flatten();
+
+ if let Some(Ok(_)) = poll_result {
+ messages_consumed += 1;
+ if messages_consumed == 5 {
+ // Sleep longer than heartbeat threshold (2s * 1.2 = 2.4s) to
trigger staleness
+ sleep(Duration::from_millis(4000)).await;
+ }
+ }
+ }
+
+ drop(consumer);
+ let _ = setup_client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await;
+ test_server.stop();
+
+ assert_eq!(
+ messages_consumed, TOTAL_MESSAGES,
+ "Should consume all messages after automatic reconnection"
+ );
+}
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 3f65fcaff..21b0f7dfa 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.8.1-edge.2"
+version = "0.8.1-edge.3"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 3c2a23666..3d7784938 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -655,14 +655,22 @@ impl IggyConsumer {
let last_stored_offset = self.last_stored_offsets.clone();
let last_consumed_offset = self.last_consumed_offsets.clone();
let allow_replay = self.allow_replay;
+ let is_consumer_group = self.is_consumer_group;
+ let joined_consumer_group = self.joined_consumer_group.clone();
async move {
if interval > 0 {
Self::wait_before_polling(interval,
last_polled_at.load(ORDERING)).await;
}
- if !can_poll.load(ORDERING) {
- trace!("Trying to poll messages in {retry_interval}...");
+ while !can_poll.load(ORDERING)
+ || (is_consumer_group && !joined_consumer_group.load(ORDERING))
+ {
+ trace!(
+ "Cannot poll yet (can_poll={}, joined_cg={}), waiting
{retry_interval}...",
+ can_poll.load(ORDERING),
+ joined_consumer_group.load(ORDERING)
+ );
sleep(retry_interval.get_duration()).await;
}
@@ -772,10 +780,17 @@ impl IggyConsumer {
let error = polled_messages.unwrap_err();
error!("Failed to poll messages: {error}");
+
+ // Handle connection/auth errors - disable polling until event
task re-enables
+ // it after reconnection and rejoin complete
if matches!(
error,
IggyError::Disconnected | IggyError::Unauthenticated |
IggyError::StaleClient
) {
+ can_poll.store(false, ORDERING);
+ if is_consumer_group {
+ joined_consumer_group.store(false, ORDERING);
+ }
trace!("Retrying to poll messages in {retry_interval}...");
sleep(retry_interval.get_duration()).await;
}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 41c7d88b0..cf5fad08c 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "server"
-version = "0.6.1-edge.2"
+version = "0.6.1-edge.3"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index 9457f8af9..67a2d4937 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -210,12 +210,12 @@ async fn handle_stream(
"Command was not handled successfully, session: {:?},
error: {e}.",
session
);
- // Only return a connection-terminating error for client not
found
- if let IggyError::ClientNotFound(_) = e {
+ // Only return a connection-terminating error for client not
found or stale
+ if matches!(e, IggyError::ClientNotFound(_) |
IggyError::StaleClient) {
sender.send_error_response(e.clone()).await?;
trace!("QUIC error response was sent.");
error!("Session will be deleted.");
- Err(anyhow!("Client not found: {e}"))
+ Err(anyhow!("Client invalid: {e}"))
} else {
// For all other errors, send response and continue the
connection
sender.send_error_response(e).await?;
diff --git a/core/server/src/shard/system/utils.rs
b/core/server/src/shard/system/utils.rs
index 2eb8f55d5..e7ad661b1 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -133,6 +133,11 @@ impl IggyShard {
)))
}
ConsumerKind::ConsumerGroup => {
+ // Client may have been removed by heartbeat verifier while
request was in-flight
+ if self.client_manager.try_get_client(client_id).is_none() {
+ return Err(IggyError::StaleClient);
+ }
+
self.ensure_consumer_group_exists(stream_id, topic_id,
&consumer.id)?;
let cg_id = self.streams.with_consumer_group_by_id(
stream_id,
@@ -146,6 +151,10 @@ impl IggyShard {
&consumer.id,
topics::helpers::get_consumer_group_member_id(client_id),
) else {
+ // Client might have been removed between check above and
here
+ if self.client_manager.try_get_client(client_id).is_none()
{
+ return Err(IggyError::StaleClient);
+ }
return Err(IggyError::ConsumerGroupMemberNotFound(
client_id,
consumer.id.clone(),
diff --git a/core/server/src/streaming/clients/client_manager.rs
b/core/server/src/streaming/clients/client_manager.rs
index 349980b55..3378fbee1 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -134,7 +134,7 @@ impl ClientManager {
let mut client = self
.clients
.get_mut(&client_id)
- .ok_or(IggyError::ClientNotFound(client_id))?;
+ .ok_or(IggyError::StaleClient)?;
client.last_heartbeat = IggyTimestamp::now();
Ok(())
}
@@ -153,7 +153,7 @@ impl ClientManager {
let mut client = self
.clients
.get_mut(&client_id)
- .ok_or(IggyError::ClientNotFound(client_id))?;
+ .ok_or(IggyError::StaleClient)?;
if client.consumer_groups.iter().any(|consumer_group| {
consumer_group.group_id == group_id
@@ -185,7 +185,7 @@ impl ClientManager {
let mut client = self
.clients
.get_mut(&client_id)
- .ok_or(IggyError::ClientNotFound(client_id))?;
+ .ok_or(IggyError::StaleClient)?;
if let Some(index) =
client.consumer_groups.iter().position(|consumer_group| {
consumer_group.stream_id == stream_id
diff --git a/core/server/src/tcp/connection_handler.rs
b/core/server/src/tcp/connection_handler.rs
index 3e19038c8..d3a58db57 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -121,13 +121,11 @@ pub(crate) async fn handle_connection(
"Command with code {cmd_code} was not handled
successfully, session: {session}, error: {error}."
);
- if let IggyError::ClientNotFound(_) = error {
- sender.send_error_response(error).await?;
+ if matches!(error, IggyError::ClientNotFound(_) |
IggyError::StaleClient) {
+ sender.send_error_response(error.clone()).await?;
debug!("TCP error response was sent to: {session}.");
error!("Session: {session} will be deleted.");
- return
Err(ConnectionError::from(IggyError::ClientNotFound(
- session.client_id,
- )));
+ return Err(ConnectionError::from(error));
} else {
sender.send_error_response(error).await?;
debug!("TCP error response was sent to: {session}.");
diff --git a/core/server/src/websocket/connection_handler.rs
b/core/server/src/websocket/connection_handler.rs
index 8209d502f..2ee97196f 100644
--- a/core/server/src/websocket/connection_handler.rs
+++ b/core/server/src/websocket/connection_handler.rs
@@ -96,7 +96,7 @@ pub(crate) async fn handle_connection(
);
return
Err(ConnectionError::from(IggyError::ConnectionClosed));
}
- IggyError::ClientNotFound(_) => {
+ IggyError::ClientNotFound(_) | IggyError::StaleClient => {
error!("Command failed for session: {session}, error:
{error}.");
sender.send_error_response(error.clone()).await?;
return Err(ConnectionError::from(error));