This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_delete_cg_method in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0d220a0cc1f65bcd6c58c1ec48d273a82da77f38 Author: numminex <[email protected]> AuthorDate: Sat Nov 15 18:19:49 2025 +0100 fix(server): delete consumer_group offset, when deleting consumer_group --- core/integration/tests/server/cg.rs | 3 +- core/integration/tests/server/mod.rs | 5 + .../consumer_group_offset_cleanup_scenario.rs | 206 +++++++++++++++++++++ core/integration/tests/server/scenarios/mod.rs | 1 + .../delete_consumer_group_handler.rs | 18 ++ core/server/src/http/consumer_groups.rs | 18 ++ core/server/src/shard/system/consumer_offsets.rs | 35 +++- core/server/src/streaming/topics/consumer_group.rs | 4 + 8 files changed, 288 insertions(+), 2 deletions(-) diff --git a/core/integration/tests/server/cg.rs b/core/integration/tests/server/cg.rs index b97de88d5..736024c25 100644 --- a/core/integration/tests/server/cg.rs +++ b/core/integration/tests/server/cg.rs @@ -17,7 +17,7 @@ use crate::server::{ ScenarioFn, auto_commit_reconnection_scenario, join_scenario, multiple_clients_scenario, - run_scenario, single_client_scenario, + offset_cleanup_scenario, run_scenario, single_client_scenario, }; use iggy_common::TransportProtocol; use serial_test::parallel; @@ -32,6 +32,7 @@ use test_case::test_matrix; single_client_scenario(), multiple_clients_scenario(), auto_commit_reconnection_scenario(), + offset_cleanup_scenario(), ] )] #[tokio::test] diff --git a/core/integration/tests/server/mod.rs b/core/integration/tests/server/mod.rs index e0b9cd025..a8473c502 100644 --- a/core/integration/tests/server/mod.rs +++ b/core/integration/tests/server/mod.rs @@ -32,6 +32,7 @@ use integration::{ }; use scenarios::{ bench_scenario, consumer_group_auto_commit_reconnection_scenario, consumer_group_join_scenario, + consumer_group_offset_cleanup_scenario, consumer_group_with_multiple_clients_polling_messages_scenario, consumer_group_with_single_client_polling_messages_scenario, create_message_payload, message_headers_scenario, stream_size_validation_scenario, system_scenario, user_scenario, @@ -81,6 +82,10 @@ fn auto_commit_reconnection_scenario() -> ScenarioFn { } } +fn offset_cleanup_scenario() -> ScenarioFn { + |factory| Box::pin(consumer_group_offset_cleanup_scenario::run(factory)) +} + fn bench_scenario() -> ScenarioFn { |factory| Box::pin(bench_scenario::run(factory)) } diff --git a/core/integration/tests/server/scenarios/consumer_group_offset_cleanup_scenario.rs b/core/integration/tests/server/scenarios/consumer_group_offset_cleanup_scenario.rs new file mode 100644 index 000000000..f81cb186c --- /dev/null +++ b/core/integration/tests/server/scenarios/consumer_group_offset_cleanup_scenario.rs @@ -0,0 +1,206 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup, create_client}; +use futures::StreamExt; +use iggy::prelude::*; +use iggy_common::ConsumerOffsetInfo; +use integration::test_server::{ClientFactory, assert_clean_system, login_root}; +use std::str::FromStr; +use tokio::time::{Duration, sleep}; + +const TEST_MESSAGES_COUNT: u32 = 100; +const MESSAGES_TO_CONSUME: u32 = 50; +const CONSUMER_GROUP_1: &str = "test-consumer-group-1"; +const CONSUMER_GROUP_2: &str = "test-consumer-group-2"; +const CONSUMER_GROUP_3: &str = "test-consumer-group-3"; + +pub async fn run(client_factory: &dyn ClientFactory) { + let client = create_client(client_factory).await; + login_root(&client).await; + init_system(&client).await; + execute_consumer_group_offset_cleanup_scenario(&client).await; + cleanup(&client, false).await; + assert_clean_system(&client).await; +} + +async fn init_system(client: &IggyClient) { + // 1. Create the stream + client.create_stream(STREAM_NAME).await.unwrap(); + + // 2. Create the topic with 1 partition + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); +} + +async fn execute_consumer_group_offset_cleanup_scenario(client: &IggyClient) { + // Step 1: Produce 100 messages + produce_messages(client).await; + + // Test with 3 different consumer groups + let consumer_groups = [CONSUMER_GROUP_1, CONSUMER_GROUP_2, CONSUMER_GROUP_3]; + + for consumer_group_name in consumer_groups { + // Step 2: Create consumer group and consume messages + test_consumer_group_lifecycle(client, consumer_group_name).await; + } +} + +async fn test_consumer_group_lifecycle(client: &IggyClient, consumer_group_name: &str) { + // Create the consumer group + let cg = client + .create_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + consumer_group_name, + ) + .await + .unwrap(); + assert_eq!(cg.name, consumer_group_name); + println!( + "!!!!!!!!!!!!!1Created consumer group: {}", + consumer_group_name + ); + + // Create a consumer and consume some messages + let mut consumer = create_consumer(client, consumer_group_name).await; + + // Consume 50 messages to establish an offset + let consumed_messages = consume_messages(&mut consumer, MESSAGES_TO_CONSUME).await; + assert_eq!(consumed_messages.len(), MESSAGES_TO_CONSUME as usize); + + // Wait a bit to ensure auto-commit has processed + sleep(Duration::from_secs(2)).await; + + // Verify that offset exists before deletion + let offset_before_delete = get_committed_offset(client, consumer_group_name).await; + assert!( + offset_before_delete.is_some(), + "Offset should exist before deleting consumer group" + ); + let offset_info = offset_before_delete.unwrap(); + assert_eq!( + offset_info.stored_offset, + (MESSAGES_TO_CONSUME - 1) as u64, + "Committed offset should be at position {} (last consumed message offset)", + MESSAGES_TO_CONSUME - 1 + ); + + // Drop the consumer before deleting the group + drop(consumer); + + // Step 3: Delete the consumer group + client + .delete_consumer_group( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(consumer_group_name).unwrap(), + ) + .await + .unwrap(); + + // Step 4: Try to get offset for the deleted consumer group - should not exist + let offset_after_delete = get_committed_offset(client, consumer_group_name).await; + assert!( + offset_after_delete.is_none(), + "Offset should not exist after deleting consumer group: {}", + consumer_group_name + ); +} + +async fn produce_messages(client: &IggyClient) { + let mut messages = Vec::new(); + for message_id in 1..=TEST_MESSAGES_COUNT { + let payload = format!("test_message_{}", message_id); + let message = IggyMessage::from_str(&payload).unwrap(); + messages.push(message); + } + + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut messages, + ) + .await + .unwrap(); +} + +async fn create_consumer(client: &IggyClient, consumer_group_name: &str) -> IggyConsumer { + let mut consumer = client + .consumer_group(consumer_group_name, STREAM_NAME, TOPIC_NAME) + .unwrap() + .batch_length(10) + .poll_interval(IggyDuration::from_str("10ms").expect("Invalid duration")) + .polling_strategy(PollingStrategy::next()) + .auto_join_consumer_group() + .auto_commit(AutoCommit::IntervalOrAfter( + IggyDuration::from_str("100ms").unwrap(), + AutoCommitAfter::ConsumingEachMessage, + )) + .build(); + + consumer.init().await.unwrap(); + consumer +} + +async fn consume_messages(consumer: &mut IggyConsumer, count: u32) -> Vec<IggyMessage> { + let mut consumed_messages = Vec::new(); + let mut consumed_count = 0; + + while consumed_count < count { + if let Some(message_result) = consumer.next().await { + match message_result { + Ok(polled_message) => { + consumed_messages.push(polled_message.message); + consumed_count += 1; + } + Err(e) => panic!("Error while consuming messages: {}", e), + } + } + } + + consumed_messages +} + +async fn get_committed_offset( + client: &IggyClient, + consumer_group_name: &str, +) -> Option<ConsumerOffsetInfo> { + let consumer = Consumer::group(Identifier::named(consumer_group_name).unwrap()); + client + .get_consumer_offset( + &consumer, + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(PARTITION_ID), + ) + .await + .unwrap() +} diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index e1ffe9c47..c8eb5a142 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -20,6 +20,7 @@ pub mod bench_scenario; pub mod concurrent_scenario; pub mod consumer_group_auto_commit_reconnection_scenario; pub mod consumer_group_join_scenario; +pub mod consumer_group_offset_cleanup_scenario; pub mod consumer_group_with_multiple_clients_polling_messages_scenario; pub mod consumer_group_with_single_client_polling_messages_scenario; pub mod create_message_payload; diff --git a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index eda996c6e..163b70d3a 100644 --- a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -24,6 +24,7 @@ use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; +use crate::streaming::polling_consumer::ConsumerGroupId; use crate::streaming::session::Session; use anyhow::Result; use err_trail::ErrContext; @@ -53,6 +54,7 @@ impl ServerCommandHandler for DeleteConsumerGroup { ) })?; let cg_id = cg.id(); + let partition_ids = cg.partitions(); // Remove all consumer group members from ClientManager using helper functions to resolve identifiers let stream_id_usize = shard.streams.with_stream_by_id( @@ -82,6 +84,22 @@ impl ServerCommandHandler for DeleteConsumerGroup { } } + let cg_id_spez = ConsumerGroupId(cg_id); + // Delete all consumer group offsets for this group using the specialized method + shard.delete_consumer_group_offsets( + cg_id_spez, + &self.stream_id, + &self.topic_id, + partition_ids, + ).await.with_error(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to delete consumer group offsets for group ID: {} in stream: {}, topic: {}", + cg_id_spez, + self.stream_id, + self.topic_id + ) + })?; + let event = ShardEvent::DeletedConsumerGroup { id: cg_id, stream_id: self.stream_id.clone(), diff --git a/core/server/src/http/consumer_groups.rs b/core/server/src/http/consumer_groups.rs index 4d9637f16..e1e9b2f7e 100644 --- a/core/server/src/http/consumer_groups.rs +++ b/core/server/src/http/consumer_groups.rs @@ -24,6 +24,7 @@ use crate::http::shared::AppState; use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker, IntoComponents}; use crate::state::command::EntryCommand; use crate::state::models::CreateConsumerGroupWithId; +use crate::streaming::polling_consumer::ConsumerGroupId; use crate::streaming::session::Session; use axum::debug_handler; use axum::extract::{Path, State}; @@ -278,6 +279,23 @@ async fn delete_consumer_group( } } + let cg_id_spez = ConsumerGroupId(cg_id); + // Clean up consumer group offsets from all partitions using the specialized method + let partition_ids = consumer_group.partitions(); + state.shard.shard().delete_consumer_group_offsets( + cg_id_spez, + &identifier_stream_id, + &identifier_topic_id, + partition_ids, + ).await.with_error(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to delete consumer group offsets for group ID: {} in stream: {}, topic: {}", + cg_id_spez, + identifier_stream_id, + identifier_topic_id + ) + })?; + // Send event for consumer group deletion { let broadcast_future = SendWrapper::new(async { diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index 07fb78d92..f145f5b3a 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -20,7 +20,12 @@ use super::COMPONENT; use crate::{ shard::IggyShard, - streaming::{partitions, polling_consumer::PollingConsumer, session::Session, streams, topics}, + streaming::{ + partitions, + polling_consumer::{ConsumerGroupId, PollingConsumer}, + session::Session, + streams, topics, + }, }; use err_trail::ErrContext; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; @@ -187,6 +192,34 @@ impl IggyShard { Ok((polling_consumer, partition_id)) } + pub async fn delete_consumer_group_offsets( + &self, + cg_id: ConsumerGroupId, + stream_id: &Identifier, + topic_id: &Identifier, + partition_ids: &[usize], + ) -> Result<(), IggyError> { + for &partition_id in partition_ids { + let path = self.streams + .with_partition_by_id(stream_id, topic_id, partition_id, partitions::helpers::delete_consumer_group_offset(cg_id)) + .with_error(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to delete consumer group offset for group with ID: {} in partition {} of topic with ID: {} and stream with ID: {}", + cg_id, partition_id, topic_id, stream_id + ) + })?; + + self.delete_consumer_offset_from_disk(&path).await.with_error(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to delete consumer group offset file for group with ID: {} in partition {} of topic with ID: {} and stream with ID: {}", + cg_id, partition_id, topic_id, stream_id + ) + })?; + } + + Ok(()) + } + fn store_consumer_offset_base( &self, stream_id: &Identifier, diff --git a/core/server/src/streaming/topics/consumer_group.rs b/core/server/src/streaming/topics/consumer_group.rs index ff624547e..c584e033d 100644 --- a/core/server/src/streaming/topics/consumer_group.rs +++ b/core/server/src/streaming/topics/consumer_group.rs @@ -196,6 +196,10 @@ impl ConsumerGroup { Self { root, members } } + pub fn partitions(&self) -> &Vec<partitions::ContainerId> { + &self.root.partitions + } + pub fn members(&self) -> &ConsumerGroupMembers { &self.members }
