This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch stale-client-fix in repository https://gitbox.apache.org/repos/asf/iggy.git
commit eb0c73b753dcb579d14c9dfd3aef8a60a798320e Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Jan 7 15:32:53 2026 +0100 fix(server,sdk): handle stale client in consumer group operations Server: check client existence before consumer group member lookup, return StaleClient instead of ConsumerGroupMemberNotFound when client was removed by heartbeat verifier mid-request. SDK: improve flag synchronization in IggyConsumer for reconnection, poll waits for rejoin to complete before sending requests. Add integration tests for stale client recovery with consumer groups. --- core/integration/tests/server/scenarios/mod.rs | 1 + .../stale_client_consumer_group_scenario.rs | 305 +++++++++++++++++++++ core/sdk/src/clients/consumer.rs | 19 +- core/server/src/shard/system/utils.rs | 9 + 4 files changed, 332 insertions(+), 2 deletions(-) diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index a940c5155..57ac2ddaa 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -29,6 +29,7 @@ pub mod delete_segments_scenario; pub mod encryption_scenario; pub mod message_headers_scenario; pub mod message_size_scenario; +pub mod stale_client_consumer_group_scenario; pub mod stream_size_validation_scenario; pub mod system_scenario; pub mod tcp_tls_scenario; diff --git a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs new file mode 100644 index 000000000..2021db2ef --- /dev/null +++ b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs @@ -0,0 +1,305 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use futures::StreamExt; +use iggy::prelude::*; +use iggy_common::Credentials; +use integration::test_server::{IpAddrKind, TestServer}; +use serial_test::parallel; +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; + +const STREAM_NAME: &str = "stale-test-stream"; +const TOPIC_NAME: &str = "stale-test-topic"; +const CONSUMER_GROUP_NAME: &str = "stale-test-cg"; +const PARTITIONS_COUNT: u32 = 1; +const TOTAL_MESSAGES: u32 = 10; + +fn create_test_server() -> TestServer { + let mut extra_envs = HashMap::new(); + extra_envs.insert("IGGY_HEARTBEAT_ENABLED".to_string(), "true".to_string()); + extra_envs.insert("IGGY_HEARTBEAT_INTERVAL".to_string(), "1s".to_string()); + extra_envs.insert( + "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(), + "true".to_string(), + ); + extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), "true".to_string()); + TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4) +} + +async fn create_client(server_addr: &str, heartbeat_interval: &str) -> IggyClient { + let config = TcpClientConfig { + server_address: server_addr.to_string(), + heartbeat_interval: IggyDuration::from_str(heartbeat_interval).unwrap(), + nodelay: true, + ..TcpClientConfig::default() + }; + let client = TcpClient::create(Arc::new(config)).unwrap(); + Client::connect(&client).await.unwrap(); + IggyClient::create(ClientWrapper::Tcp(client), None, None) +} + +async fn create_reconnecting_client(server_addr: &str) -> IggyClient { + let config = TcpClientConfig { + server_address: server_addr.to_string(), + heartbeat_interval: IggyDuration::from_str("1h").unwrap(), + nodelay: true, + auto_login: AutoLogin::Enabled(Credentials::UsernamePassword( + DEFAULT_ROOT_USERNAME.to_string(), + DEFAULT_ROOT_PASSWORD.to_string(), + )), + reconnection: TcpClientReconnectionConfig { + enabled: true, + max_retries: Some(5), + interval: IggyDuration::from_str("500ms").unwrap(), + reestablish_after: IggyDuration::from_str("100ms").unwrap(), + }, + ..TcpClientConfig::default() + }; + let client = TcpClient::create(Arc::new(config)).unwrap(); + Client::connect(&client).await.unwrap(); + IggyClient::create(ClientWrapper::Tcp(client), None, None) +} + +async fn setup_resources(client: &IggyClient) { + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + + client.create_stream(STREAM_NAME).await.unwrap(); + + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + CONSUMER_GROUP_NAME, + ) + .await + .unwrap(); + + for i in 0..TOTAL_MESSAGES { + let message = IggyMessage::from_str(&format!("message-{i}")).unwrap(); + let mut messages = vec![message]; + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .unwrap(); + } +} + +/// Tests that a stale client receives clean errors and can manually reconnect. +#[tokio::test] +#[parallel] +async fn should_handle_stale_client_with_manual_reconnection() { + let mut test_server = create_test_server(); + test_server.start(); + let server_addr = test_server.get_raw_tcp_addr().unwrap(); + + let setup_client = create_client(&server_addr, "500ms").await; + setup_resources(&setup_client).await; + + // Client with 1h heartbeat will become stale + let stale_client = create_client(&server_addr, "1h").await; + stale_client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + + stale_client + .join_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap(); + + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); + + // Poll first 5 messages + let mut messages_polled = 0; + while messages_polled < 5 { + let polled = stale_client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await + .unwrap(); + messages_polled += polled.messages.len(); + } + assert_eq!(messages_polled, 5); + + // Wait for heartbeat timeout + sleep(Duration::from_secs(2)).await; + + // Should get error after stale detection + let mut got_error = false; + for _ in 0..3 { + if stale_client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await + .is_err() + { + got_error = true; + break; + } + sleep(Duration::from_millis(100)).await; + } + assert!(got_error, "Expected error after heartbeat timeout"); + + // Reconnect with new client + drop(stale_client); + let new_client = create_client(&server_addr, "500ms").await; + new_client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + new_client + .join_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), + ) + .await + .unwrap(); + + // Poll remaining messages + let mut remaining_polled = 0; + let start = std::time::Instant::now(); + while remaining_polled < 5 && start.elapsed() < Duration::from_secs(5) { + match new_client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + None, + &consumer, + &PollingStrategy::next(), + 1, + true, + ) + .await + { + Ok(polled) => remaining_polled += polled.messages.len(), + Err(_) => sleep(Duration::from_millis(100)).await, + } + } + assert_eq!(remaining_polled, 5); + + let _ = setup_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await; + test_server.stop(); +} + +/// Tests that IggyConsumer automatically recovers after stale disconnect. +#[tokio::test] +#[parallel] +async fn should_handle_stale_client_with_auto_reconnection() { + let mut test_server = create_test_server(); + test_server.start(); + let server_addr = test_server.get_raw_tcp_addr().unwrap(); + + let setup_client = create_client(&server_addr, "500ms").await; + setup_resources(&setup_client).await; + + let consumer_client = create_reconnecting_client(&server_addr).await; + consumer_client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); + + let mut consumer: IggyConsumer = consumer_client + .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME) + .unwrap() + .batch_length(1) + .poll_interval(IggyDuration::from_str("100ms").unwrap()) + .polling_strategy(PollingStrategy::next()) + .auto_join_consumer_group() + .create_consumer_group_if_not_exists() + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .polling_retry_interval(IggyDuration::from_str("500ms").unwrap()) + .build(); + + consumer.init().await.unwrap(); + + let mut messages_consumed = 0u32; + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(15); + + while messages_consumed < TOTAL_MESSAGES && start.elapsed() < timeout { + let poll_result: Option<Result<ReceivedMessage, IggyError>> = + tokio::time::timeout(Duration::from_millis(500), consumer.next()) + .await + .ok() + .flatten(); + + if let Some(Ok(_)) = poll_result { + messages_consumed += 1; + if messages_consumed == 5 { + sleep(Duration::from_millis(2500)).await; + } + } + } + + drop(consumer); + let _ = setup_client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await; + test_server.stop(); + + assert_eq!( + messages_consumed, TOTAL_MESSAGES, + "Should consume all messages after automatic reconnection" + ); +} diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 3c2a23666..3d7784938 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -655,14 +655,22 @@ impl IggyConsumer { let last_stored_offset = self.last_stored_offsets.clone(); let last_consumed_offset = self.last_consumed_offsets.clone(); let allow_replay = self.allow_replay; + let is_consumer_group = self.is_consumer_group; + let joined_consumer_group = self.joined_consumer_group.clone(); async move { if interval > 0 { Self::wait_before_polling(interval, last_polled_at.load(ORDERING)).await; } - if !can_poll.load(ORDERING) { - trace!("Trying to poll messages in {retry_interval}..."); + while !can_poll.load(ORDERING) + || (is_consumer_group && !joined_consumer_group.load(ORDERING)) + { + trace!( + "Cannot poll yet (can_poll={}, joined_cg={}), waiting {retry_interval}...", + can_poll.load(ORDERING), + joined_consumer_group.load(ORDERING) + ); sleep(retry_interval.get_duration()).await; } @@ -772,10 +780,17 @@ impl IggyConsumer { let error = polled_messages.unwrap_err(); error!("Failed to poll messages: {error}"); + + // Handle connection/auth errors - disable polling until event task re-enables + // it after reconnection and rejoin complete if matches!( error, IggyError::Disconnected | IggyError::Unauthenticated | IggyError::StaleClient ) { + can_poll.store(false, ORDERING); + if is_consumer_group { + joined_consumer_group.store(false, ORDERING); + } trace!("Retrying to poll messages in {retry_interval}..."); sleep(retry_interval.get_duration()).await; } diff --git a/core/server/src/shard/system/utils.rs b/core/server/src/shard/system/utils.rs index 2eb8f55d5..e7ad661b1 100644 --- a/core/server/src/shard/system/utils.rs +++ b/core/server/src/shard/system/utils.rs @@ -133,6 +133,11 @@ impl IggyShard { ))) } ConsumerKind::ConsumerGroup => { + // Client may have been removed by heartbeat verifier while request was in-flight + if self.client_manager.try_get_client(client_id).is_none() { + return Err(IggyError::StaleClient); + } + self.ensure_consumer_group_exists(stream_id, topic_id, &consumer.id)?; let cg_id = self.streams.with_consumer_group_by_id( stream_id, @@ -146,6 +151,10 @@ impl IggyShard { &consumer.id, topics::helpers::get_consumer_group_member_id(client_id), ) else { + // Client might have been removed between check above and here + if self.client_manager.try_get_client(client_id).is_none() { + return Err(IggyError::StaleClient); + } return Err(IggyError::ConsumerGroupMemberNotFound( client_id, consumer.id.clone(),
