This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 3dbfc6959 fix(server): delete consumer_group offset, when deleting
consumer_group (#2352)
3dbfc6959 is described below
commit 3dbfc69599968c5793344c5fee8c32b55317c36d
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sat Nov 15 18:58:40 2025 +0100
fix(server): delete consumer_group offset, when deleting consumer_group
(#2352)
This PR fixed the `delete_consumer_group` method, by removing
consumer_group offsets from partitions that were part of the consumer
group.
---
Cargo.lock | 6 +-
Cargo.toml | 6 +-
DEPENDENCIES.md | 6 +-
core/binary_protocol/Cargo.toml | 2 +-
core/common/Cargo.toml | 2 +-
core/integration/tests/server/cg.rs | 3 +-
core/integration/tests/server/mod.rs | 5 +
.../consumer_group_offset_cleanup_scenario.rs | 201 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/sdk/Cargo.toml | 2 +-
.../delete_consumer_group_handler.rs | 18 ++
core/server/src/http/consumer_groups.rs | 18 ++
core/server/src/shard/system/consumer_offsets.rs | 49 ++++-
core/server/src/streaming/topics/consumer_group.rs | 4 +
foreign/python/Cargo.toml | 2 +-
15 files changed, 310 insertions(+), 15 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 5082753d4..90b151250 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4153,7 +4153,7 @@ dependencies = [
[[package]]
name = "iggy"
-version = "0.8.0-edge.4"
+version = "0.8.0-edge.5"
dependencies = [
"async-broadcast",
"async-dropper",
@@ -4316,7 +4316,7 @@ dependencies = [
[[package]]
name = "iggy_binary_protocol"
-version = "0.8.0-edge.4"
+version = "0.8.0-edge.5"
dependencies = [
"anyhow",
"async-broadcast",
@@ -4337,7 +4337,7 @@ dependencies = [
[[package]]
name = "iggy_common"
-version = "0.8.0-edge.4"
+version = "0.8.0-edge.5"
dependencies = [
"aes-gcm",
"ahash 0.8.12",
diff --git a/Cargo.toml b/Cargo.toml
index 243e1729b..3db952da8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -119,9 +119,9 @@ futures = "0.3.31"
futures-util = "0.3.31"
human-repr = "1.1.0"
humantime = "2.3.0"
-iggy = { path = "core/sdk", version = "0.8.0-edge.4" }
-iggy_binary_protocol = { path = "core/binary_protocol", version =
"0.8.0-edge.4" }
-iggy_common = { path = "core/common", version = "0.8.0-edge.4" }
+iggy = { path = "core/sdk", version = "0.8.0-edge.5" }
+iggy_binary_protocol = { path = "core/binary_protocol", version =
"0.8.0-edge.5" }
+iggy_common = { path = "core/common", version = "0.8.0-edge.5" }
iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.0" }
integration = { path = "core/integration" }
keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 3498c0c30..8050a5231 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -368,14 +368,14 @@ icu_provider: 2.1.1, "Unicode-3.0",
ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.0-edge.4, "Apache-2.0",
+iggy: 0.8.0-edge.5, "Apache-2.0",
iggy-bench: 0.3.0-edge.3, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.0-edge.3, "Apache-2.0",
iggy-cli: 0.10.0-edge.3, "Apache-2.0",
iggy-connectors: 0.2.0-edge.3, "Apache-2.0",
iggy-mcp: 0.2.0-edge.3, "Apache-2.0",
-iggy_binary_protocol: 0.8.0-edge.4, "Apache-2.0",
-iggy_common: 0.8.0-edge.4, "Apache-2.0",
+iggy_binary_protocol: 0.8.0-edge.5, "Apache-2.0",
+iggy_common: 0.8.0-edge.5, "Apache-2.0",
iggy_connector_postgres_sink: 0.1.0, "Apache-2.0",
iggy_connector_postgres_source: 0.1.0, "Apache-2.0",
iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0",
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index 034fe4399..a4d57b6c0 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_binary_protocol"
-version = "0.8.0-edge.4"
+version = "0.8.0-edge.5"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index ab276b346..eaef15f09 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
name = "iggy_common"
-version = "0.8.0-edge.4"
+version = "0.8.0-edge.5"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/integration/tests/server/cg.rs
b/core/integration/tests/server/cg.rs
index b97de88d5..736024c25 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -17,7 +17,7 @@
use crate::server::{
ScenarioFn, auto_commit_reconnection_scenario, join_scenario,
multiple_clients_scenario,
- run_scenario, single_client_scenario,
+ offset_cleanup_scenario, run_scenario, single_client_scenario,
};
use iggy_common::TransportProtocol;
use serial_test::parallel;
@@ -32,6 +32,7 @@ use test_case::test_matrix;
single_client_scenario(),
multiple_clients_scenario(),
auto_commit_reconnection_scenario(),
+ offset_cleanup_scenario(),
]
)]
#[tokio::test]
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index e0b9cd025..a8473c502 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -32,6 +32,7 @@ use integration::{
};
use scenarios::{
bench_scenario, consumer_group_auto_commit_reconnection_scenario,
consumer_group_join_scenario,
+ consumer_group_offset_cleanup_scenario,
consumer_group_with_multiple_clients_polling_messages_scenario,
consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
message_headers_scenario, stream_size_validation_scenario,
system_scenario, user_scenario,
@@ -81,6 +82,10 @@ fn auto_commit_reconnection_scenario() -> ScenarioFn {
}
}
+fn offset_cleanup_scenario() -> ScenarioFn {
+ |factory| Box::pin(consumer_group_offset_cleanup_scenario::run(factory))
+}
+
fn bench_scenario() -> ScenarioFn {
|factory| Box::pin(bench_scenario::run(factory))
}
diff --git
a/core/integration/tests/server/scenarios/consumer_group_offset_cleanup_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_offset_cleanup_scenario.rs
new file mode 100644
index 000000000..dad706b60
--- /dev/null
+++
b/core/integration/tests/server/scenarios/consumer_group_offset_cleanup_scenario.rs
@@ -0,0 +1,201 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup,
create_client};
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::ConsumerOffsetInfo;
+use integration::test_server::{ClientFactory, assert_clean_system, login_root};
+use std::str::FromStr;
+use tokio::time::{Duration, sleep};
+
+const TEST_MESSAGES_COUNT: u32 = 100;
+const MESSAGES_TO_CONSUME: u32 = 50;
+const CONSUMER_GROUP_1: &str = "test-consumer-group-1";
+const CONSUMER_GROUP_2: &str = "test-consumer-group-2";
+const CONSUMER_GROUP_3: &str = "test-consumer-group-3";
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = create_client(client_factory).await;
+ login_root(&client).await;
+ init_system(&client).await;
+ execute_consumer_group_offset_cleanup_scenario(&client).await;
+ cleanup(&client, false).await;
+ assert_clean_system(&client).await;
+}
+
+async fn init_system(client: &IggyClient) {
+ // 1. Create the stream
+ client.create_stream(STREAM_NAME).await.unwrap();
+
+ // 2. Create the topic with 1 partition
+ client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+}
+
+async fn execute_consumer_group_offset_cleanup_scenario(client: &IggyClient) {
+ // Step 1: Produce 100 messages
+ produce_messages(client).await;
+
+ // Test with 3 different consumer groups
+ let consumer_groups = [CONSUMER_GROUP_1, CONSUMER_GROUP_2,
CONSUMER_GROUP_3];
+
+ for consumer_group_name in consumer_groups {
+ // Step 2: Create consumer group and consume messages
+ test_consumer_group_lifecycle(client, consumer_group_name).await;
+ }
+}
+
+async fn test_consumer_group_lifecycle(client: &IggyClient,
consumer_group_name: &str) {
+ // Create the consumer group
+ let cg = client
+ .create_consumer_group(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ consumer_group_name,
+ )
+ .await
+ .unwrap();
+ assert_eq!(cg.name, consumer_group_name);
+ // Create a consumer and consume some messages
+ let mut consumer = create_consumer(client, consumer_group_name).await;
+
+ // Consume 50 messages to establish an offset
+ let consumed_messages = consume_messages(&mut consumer,
MESSAGES_TO_CONSUME).await;
+ assert_eq!(consumed_messages.len(), MESSAGES_TO_CONSUME as usize);
+
+ // Wait a bit to ensure auto-commit has processed
+ sleep(Duration::from_secs(2)).await;
+
+ // Verify that offset exists before deletion
+ let offset_before_delete = get_committed_offset(client,
consumer_group_name).await;
+ assert!(
+ offset_before_delete.is_some(),
+ "Offset should exist before deleting consumer group"
+ );
+ let offset_info = offset_before_delete.unwrap();
+ assert_eq!(
+ offset_info.stored_offset,
+ (MESSAGES_TO_CONSUME - 1) as u64,
+ "Committed offset should be at position {} (last consumed message
offset)",
+ MESSAGES_TO_CONSUME - 1
+ );
+
+ // Drop the consumer before deleting the group
+ drop(consumer);
+
+ // Step 3: Delete the consumer group
+ client
+ .delete_consumer_group(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(consumer_group_name).unwrap(),
+ )
+ .await
+ .unwrap();
+
+ // Step 4: Try to get offset for the deleted consumer group - should not
exist
+ let offset_after_delete = get_committed_offset(client,
consumer_group_name).await;
+ assert!(
+ offset_after_delete.is_none(),
+ "Offset should not exist after deleting consumer group: {}",
+ consumer_group_name
+ );
+}
+
+async fn produce_messages(client: &IggyClient) {
+ let mut messages = Vec::new();
+ for message_id in 1..=TEST_MESSAGES_COUNT {
+ let payload = format!("test_message_{}", message_id);
+ let message = IggyMessage::from_str(&payload).unwrap();
+ messages.push(message);
+ }
+
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+}
+
+async fn create_consumer(client: &IggyClient, consumer_group_name: &str) ->
IggyConsumer {
+ let mut consumer = client
+ .consumer_group(consumer_group_name, STREAM_NAME, TOPIC_NAME)
+ .unwrap()
+ .batch_length(10)
+ .poll_interval(IggyDuration::from_str("10ms").expect("Invalid
duration"))
+ .polling_strategy(PollingStrategy::next())
+ .auto_join_consumer_group()
+ .auto_commit(AutoCommit::IntervalOrAfter(
+ IggyDuration::from_str("100ms").unwrap(),
+ AutoCommitAfter::ConsumingEachMessage,
+ ))
+ .build();
+
+ consumer.init().await.unwrap();
+ consumer
+}
+
+async fn consume_messages(consumer: &mut IggyConsumer, count: u32) ->
Vec<IggyMessage> {
+ let mut consumed_messages = Vec::new();
+ let mut consumed_count = 0;
+
+ while consumed_count < count {
+ if let Some(message_result) = consumer.next().await {
+ match message_result {
+ Ok(polled_message) => {
+ consumed_messages.push(polled_message.message);
+ consumed_count += 1;
+ }
+ Err(e) => panic!("Error while consuming messages: {}", e),
+ }
+ }
+ }
+
+ consumed_messages
+}
+
+async fn get_committed_offset(
+ client: &IggyClient,
+ consumer_group_name: &str,
+) -> Option<ConsumerOffsetInfo> {
+ let consumer =
Consumer::group(Identifier::named(consumer_group_name).unwrap());
+ client
+ .get_consumer_offset(
+ &consumer,
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ )
+ .await
+ .unwrap()
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index e1ffe9c47..c8eb5a142 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -20,6 +20,7 @@ pub mod bench_scenario;
pub mod concurrent_scenario;
pub mod consumer_group_auto_commit_reconnection_scenario;
pub mod consumer_group_join_scenario;
+pub mod consumer_group_offset_cleanup_scenario;
pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
pub mod consumer_group_with_single_client_polling_messages_scenario;
pub mod create_message_payload;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 675177914..d7d82d43f 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.8.0-edge.4"
+version = "0.8.0-edge.5"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index eda996c6e..163b70d3a 100644
---
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -24,6 +24,7 @@ use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
use crate::state::command::EntryCommand;
+use crate::streaming::polling_consumer::ConsumerGroupId;
use crate::streaming::session::Session;
use anyhow::Result;
use err_trail::ErrContext;
@@ -53,6 +54,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
)
})?;
let cg_id = cg.id();
+ let partition_ids = cg.partitions();
// Remove all consumer group members from ClientManager using helper
functions to resolve identifiers
let stream_id_usize = shard.streams.with_stream_by_id(
@@ -82,6 +84,22 @@ impl ServerCommandHandler for DeleteConsumerGroup {
}
}
+ let cg_id_spez = ConsumerGroupId(cg_id);
+ // Delete all consumer group offsets for this group using the
specialized method
+ shard.delete_consumer_group_offsets(
+ cg_id_spez,
+ &self.stream_id,
+ &self.topic_id,
+ partition_ids,
+ ).await.with_error(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to delete consumer
group offsets for group ID: {} in stream: {}, topic: {}",
+ cg_id_spez,
+ self.stream_id,
+ self.topic_id
+ )
+ })?;
+
let event = ShardEvent::DeletedConsumerGroup {
id: cg_id,
stream_id: self.stream_id.clone(),
diff --git a/core/server/src/http/consumer_groups.rs
b/core/server/src/http/consumer_groups.rs
index 4d9637f16..e1e9b2f7e 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -24,6 +24,7 @@ use crate::http::shared::AppState;
use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker,
IntoComponents};
use crate::state::command::EntryCommand;
use crate::state::models::CreateConsumerGroupWithId;
+use crate::streaming::polling_consumer::ConsumerGroupId;
use crate::streaming::session::Session;
use axum::debug_handler;
use axum::extract::{Path, State};
@@ -278,6 +279,23 @@ async fn delete_consumer_group(
}
}
+ let cg_id_spez = ConsumerGroupId(cg_id);
+ // Clean up consumer group offsets from all partitions using the
specialized method
+ let partition_ids = consumer_group.partitions();
+ state.shard.shard().delete_consumer_group_offsets(
+ cg_id_spez,
+ &identifier_stream_id,
+ &identifier_topic_id,
+ partition_ids,
+ ).await.with_error(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to delete consumer
group offsets for group ID: {} in stream: {}, topic: {}",
+ cg_id_spez,
+ identifier_stream_id,
+ identifier_topic_id
+ )
+ })?;
+
// Send event for consumer group deletion
{
let broadcast_future = SendWrapper::new(async {
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 07fb78d92..23c2af17e 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -20,7 +20,12 @@
use super::COMPONENT;
use crate::{
shard::IggyShard,
- streaming::{partitions, polling_consumer::PollingConsumer,
session::Session, streams, topics},
+ streaming::{
+ partitions,
+ polling_consumer::{ConsumerGroupId, PollingConsumer},
+ session::Session,
+ streams, topics,
+ },
};
use err_trail::ErrContext;
use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError};
@@ -187,6 +192,48 @@ impl IggyShard {
Ok((polling_consumer, partition_id))
}
+ pub async fn delete_consumer_group_offsets(
+ &self,
+ cg_id: ConsumerGroupId,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_ids: &[usize],
+ ) -> Result<(), IggyError> {
+ for &partition_id in partition_ids {
+ // Skip if offset does not exist.
+ let has_offset = self
+ .streams
+ .with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ partitions::helpers::get_consumer_group_offset(cg_id),
+ )
+ .is_some();
+ if !has_offset {
+ continue;
+ }
+
+ let path = self.streams
+ .with_partition_by_id(stream_id, topic_id, partition_id,
partitions::helpers::delete_consumer_group_offset(cg_id))
+ .with_error(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to delete
consumer group offset for group with ID: {} in partition {} of topic with ID:
{} and stream with ID: {}",
+ cg_id, partition_id, topic_id, stream_id
+ )
+ })?;
+
+
self.delete_consumer_offset_from_disk(&path).await.with_error(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to delete consumer
group offset file for group with ID: {} in partition {} of topic with ID: {}
and stream with ID: {}",
+ cg_id, partition_id, topic_id, stream_id
+ )
+ })?;
+ }
+
+ Ok(())
+ }
+
fn store_consumer_offset_base(
&self,
stream_id: &Identifier,
diff --git a/core/server/src/streaming/topics/consumer_group.rs
b/core/server/src/streaming/topics/consumer_group.rs
index ff624547e..c584e033d 100644
--- a/core/server/src/streaming/topics/consumer_group.rs
+++ b/core/server/src/streaming/topics/consumer_group.rs
@@ -196,6 +196,10 @@ impl ConsumerGroup {
Self { root, members }
}
+ pub fn partitions(&self) -> &Vec<partitions::ContainerId> {
+ &self.root.partitions
+ }
+
pub fn members(&self) -> &ConsumerGroupMembers {
&self.members
}
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 40b9f7c8e..7bf97e6e0 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -27,7 +27,7 @@ repository = "https://github.com/apache/iggy"
[dependencies]
bytes = "1.10.1"
-iggy = { path = "../../core/sdk", version = "0.8.0-edge.4" }
+iggy = { path = "../../core/sdk", version = "0.8.0-edge.5" }
pyo3 = "0.26.0"
pyo3-async-runtimes = { version = "0.26.0", features = [
"attributes",