This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f7db1281f fix(server): fix store_consumer_offset for consumer groups
(#2351)
f7db1281f is described below
commit f7db1281fed9146616474608714bcc36dadf143f
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sat Nov 15 17:27:13 2025 +0100
fix(server): fix store_consumer_offset for consumer groups (#2351)
This PR addresses the problem of storing consumer offset for consumer group
members,
by storing ConsumerGroupId instead of MemberId in the consumer offset
storage for CG members.
---
Cargo.lock | 6 +-
Cargo.toml | 6 +-
DEPENDENCIES.md | 6 +-
core/binary_protocol/Cargo.toml | 2 +-
core/common/Cargo.toml | 2 +-
core/integration/tests/server/cg.rs | 4 +-
core/integration/tests/server/mod.rs | 10 +-
...umer_group_auto_commit_reconnection_scenario.rs | 201 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/sdk/Cargo.toml | 2 +-
core/server/src/bootstrap.rs | 19 +-
core/server/src/shard/system/consumer_offsets.rs | 43 +++--
core/server/src/slab/streams.rs | 74 +++++---
.../src/streaming/partitions/consumer_offset.rs | 7 +-
core/server/src/streaming/partitions/helpers.rs | 50 ++---
core/server/src/streaming/partitions/partition.rs | 7 +-
core/server/src/streaming/partitions/storage.rs | 88 +++++++--
core/server/src/streaming/polling_consumer.rs | 28 ++-
core/server/src/streaming/streams/helpers.rs | 8 +-
foreign/python/Cargo.toml | 2 +-
20 files changed, 453 insertions(+), 113 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 9b2474380..5082753d4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4153,7 +4153,7 @@ dependencies = [
[[package]]
name = "iggy"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
dependencies = [
"async-broadcast",
"async-dropper",
@@ -4316,7 +4316,7 @@ dependencies = [
[[package]]
name = "iggy_binary_protocol"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
dependencies = [
"anyhow",
"async-broadcast",
@@ -4337,7 +4337,7 @@ dependencies = [
[[package]]
name = "iggy_common"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
dependencies = [
"aes-gcm",
"ahash 0.8.12",
diff --git a/Cargo.toml b/Cargo.toml
index 7fb7a4d5a..243e1729b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -119,9 +119,9 @@ futures = "0.3.31"
futures-util = "0.3.31"
human-repr = "1.1.0"
humantime = "2.3.0"
-iggy = { path = "core/sdk", version = "0.8.0-edge.3" }
-iggy_binary_protocol = { path = "core/binary_protocol", version =
"0.8.0-edge.3" }
-iggy_common = { path = "core/common", version = "0.8.0-edge.3" }
+iggy = { path = "core/sdk", version = "0.8.0-edge.4" }
+iggy_binary_protocol = { path = "core/binary_protocol", version =
"0.8.0-edge.4" }
+iggy_common = { path = "core/common", version = "0.8.0-edge.4" }
iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.0" }
integration = { path = "core/integration" }
keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index c3be25088..3498c0c30 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -368,14 +368,14 @@ icu_provider: 2.1.1, "Unicode-3.0",
ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.0-edge.3, "Apache-2.0",
+iggy: 0.8.0-edge.4, "Apache-2.0",
iggy-bench: 0.3.0-edge.3, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.0-edge.3, "Apache-2.0",
iggy-cli: 0.10.0-edge.3, "Apache-2.0",
iggy-connectors: 0.2.0-edge.3, "Apache-2.0",
iggy-mcp: 0.2.0-edge.3, "Apache-2.0",
-iggy_binary_protocol: 0.8.0-edge.3, "Apache-2.0",
-iggy_common: 0.8.0-edge.3, "Apache-2.0",
+iggy_binary_protocol: 0.8.0-edge.4, "Apache-2.0",
+iggy_common: 0.8.0-edge.4, "Apache-2.0",
iggy_connector_postgres_sink: 0.1.0, "Apache-2.0",
iggy_connector_postgres_source: 0.1.0, "Apache-2.0",
iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0",
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index 9bf5cc0da..034fe4399 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_binary_protocol"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index af9d9f3b9..ab276b346 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
name = "iggy_common"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/integration/tests/server/cg.rs
b/core/integration/tests/server/cg.rs
index 7588eb1d9..b97de88d5 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -16,7 +16,8 @@
// under the License.
use crate::server::{
- ScenarioFn, join_scenario, multiple_clients_scenario, run_scenario,
single_client_scenario,
+ ScenarioFn, auto_commit_reconnection_scenario, join_scenario,
multiple_clients_scenario,
+ run_scenario, single_client_scenario,
};
use iggy_common::TransportProtocol;
use serial_test::parallel;
@@ -30,6 +31,7 @@ use test_case::test_matrix;
join_scenario(),
single_client_scenario(),
multiple_clients_scenario(),
+ auto_commit_reconnection_scenario(),
]
)]
#[tokio::test]
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index 5200fc53f..e0b9cd025 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -31,7 +31,7 @@ use integration::{
websocket_client::WebSocketClientFactory,
};
use scenarios::{
- bench_scenario, consumer_group_join_scenario,
+ bench_scenario, consumer_group_auto_commit_reconnection_scenario,
consumer_group_join_scenario,
consumer_group_with_multiple_clients_polling_messages_scenario,
consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
message_headers_scenario, stream_size_validation_scenario,
system_scenario, user_scenario,
@@ -73,6 +73,14 @@ fn multiple_clients_scenario() -> ScenarioFn {
|factory|
Box::pin(consumer_group_with_multiple_clients_polling_messages_scenario::run(factory))
}
+fn auto_commit_reconnection_scenario() -> ScenarioFn {
+ |factory| {
+ Box::pin(consumer_group_auto_commit_reconnection_scenario::run(
+ factory,
+ ))
+ }
+}
+
fn bench_scenario() -> ScenarioFn {
|factory| Box::pin(bench_scenario::run(factory))
}
diff --git
a/core/integration/tests/server/scenarios/consumer_group_auto_commit_reconnection_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_auto_commit_reconnection_scenario.rs
new file mode 100644
index 000000000..485f02e35
--- /dev/null
+++
b/core/integration/tests/server/scenarios/consumer_group_auto_commit_reconnection_scenario.rs
@@ -0,0 +1,201 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{
+ CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client,
+};
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::ConsumerOffsetInfo;
+use integration::test_server::{ClientFactory, login_root};
+use std::str::FromStr;
+use tokio::time::{Duration, sleep};
+
+const TEST_MESSAGES_COUNT: u32 = 100;
+const HALF_MESSAGES_COUNT: u32 = TEST_MESSAGES_COUNT / 2;
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = create_client(client_factory).await;
+ login_root(&client).await;
+ init_system(&client).await;
+ execute_auto_commit_reconnection_scenario(&client).await;
+}
+
+async fn init_system(client: &IggyClient) {
+ // 1. Create the stream
+ client.create_stream(STREAM_NAME).await.unwrap();
+
+ // 2. Create the topic
+ client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ // 3. Create the consumer group
+ client
+ .create_consumer_group(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ CONSUMER_GROUP_NAME,
+ )
+ .await
+ .unwrap();
+}
+
+async fn execute_auto_commit_reconnection_scenario(client: &IggyClient) {
+ // Step 1: Produce messages to exactly the same stream/topic/partition
+ produce_messages_to_partition(client).await;
+
+ // Step 2: Create a consumer group with singular consumer that uses
auto_commit
+ let mut consumer = create_auto_commit_consumer(client).await;
+
+ // Step 3: Consume 50% of the produced messages
+ let consumed_messages = consume_half_messages(&mut consumer).await;
+ assert_eq!(consumed_messages.len(), HALF_MESSAGES_COUNT as usize);
+
+ // Wait a bit to ensure auto-commit has processed
+ sleep(Duration::from_secs(2)).await;
+
+ // Step 4: Disconnect the consumer
+ drop(consumer);
+
+ // Step 5: Check if the committed offset is at the place where it should
be (halfway through)
+ let committed_offset_after_half = get_committed_offset(client).await;
+ assert!(committed_offset_after_half.is_some());
+ let offset_info = committed_offset_after_half.unwrap();
+ // The offset should be around half of the messages (offset is 0-based)
+ assert_eq!(offset_info.stored_offset, (HALF_MESSAGES_COUNT - 1) as u64);
+
+ // Step 6: Reconnect the consumer
+ let mut consumer = create_auto_commit_consumer(client).await;
+
+ // Step 7: Consume rest of the messages
+ let remaining_messages = consume_remaining_messages(&mut consumer).await;
+ assert_eq!(remaining_messages.len(), HALF_MESSAGES_COUNT as usize);
+
+ // Wait a bit to ensure auto-commit has processed
+ sleep(Duration::from_secs(2)).await;
+
+ // Step 8: Check if the committed offset is at the end
+ let committed_offset_final = get_committed_offset(client).await;
+ assert!(committed_offset_final.is_some());
+ let final_offset_info = committed_offset_final.unwrap();
+ // The offset should be at the last message (offset is 0-based)
+ assert_eq!(
+ final_offset_info.stored_offset,
+ (TEST_MESSAGES_COUNT - 1) as u64
+ );
+
+ drop(consumer);
+}
+
+async fn produce_messages_to_partition(client: &IggyClient) {
+ let mut messages = Vec::new();
+ for message_id in 1..=TEST_MESSAGES_COUNT {
+ let payload = format!("test_message_{}", message_id);
+ let message = IggyMessage::from_str(&payload).unwrap();
+ messages.push(message);
+ }
+
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+}
+
+async fn create_auto_commit_consumer(client: &IggyClient) -> IggyConsumer {
+ let mut consumer = client
+ .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
+ .unwrap()
+ .batch_length(10)
+ .poll_interval(IggyDuration::from_str("100ms").expect("Invalid
duration"))
+ .polling_strategy(PollingStrategy::next())
+ .auto_join_consumer_group()
+ .auto_commit(AutoCommit::IntervalOrAfter(
+ IggyDuration::from_str("1s").unwrap(),
+ AutoCommitAfter::ConsumingEachMessage,
+ ))
+ .build();
+
+ consumer.init().await.unwrap();
+ consumer
+}
+
+async fn consume_half_messages(consumer: &mut IggyConsumer) ->
Vec<IggyMessage> {
+ let mut consumed_messages = Vec::new();
+ let mut count = 0;
+
+ while count < HALF_MESSAGES_COUNT {
+ if let Some(message_result) = consumer.next().await {
+ match message_result {
+ Ok(polled_message) => {
+ consumed_messages.push(polled_message.message);
+ count += 1;
+ }
+ Err(e) => panic!("Error while consuming messages: {}", e),
+ }
+ }
+ }
+
+ consumed_messages
+}
+
+async fn consume_remaining_messages(consumer: &mut IggyConsumer) ->
Vec<IggyMessage> {
+ let mut consumed_messages = Vec::new();
+ let mut count = 0;
+
+ while count < HALF_MESSAGES_COUNT {
+ if let Some(message_result) = consumer.next().await {
+ match message_result {
+ Ok(polled_message) => {
+ consumed_messages.push(polled_message.message);
+ count += 1;
+ }
+ Err(e) => panic!("Error while consuming remaining messages:
{}", e),
+ }
+ }
+ }
+
+ consumed_messages
+}
+
+async fn get_committed_offset(client: &IggyClient) ->
Option<ConsumerOffsetInfo> {
+ let consumer =
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+ client
+ .get_consumer_offset(
+ &consumer,
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ )
+ .await
+ .unwrap()
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 05f46901d..e1ffe9c47 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -18,6 +18,7 @@
pub mod bench_scenario;
pub mod concurrent_scenario;
+pub mod consumer_group_auto_commit_reconnection_scenario;
pub mod consumer_group_join_scenario;
pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
pub mod consumer_group_with_single_client_polling_messages_scenario;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 7b228ce65..675177914 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index b5ba07fce..f7cec4668 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -43,12 +43,16 @@ use crate::{
state::system::{StreamState, TopicState, UserState},
streaming::{
partitions::{
- consumer_offset::ConsumerOffset,
helpers::create_message_deduplicator,
- journal::MemoryMessageJournal, log::SegmentedLog, partition,
- storage::load_consumer_offsets,
+ consumer_offset::ConsumerOffset,
+ helpers::create_message_deduplicator,
+ journal::MemoryMessageJournal,
+ log::SegmentedLog,
+ partition,
+ storage::{load_consumer_group_offsets, load_consumer_offsets},
},
persistence::persister::{FilePersister, FileWithSyncPersister,
PersisterKind},
personal_access_tokens::personal_access_token::PersonalAccessToken,
+ polling_consumer::ConsumerGroupId,
segments::{Segment, storage::Storage},
stats::{PartitionStats, StreamStats, TopicStats},
storage::SystemStorage,
@@ -63,7 +67,7 @@ use ahash::HashMap;
use compio::{fs::create_dir_all, runtime::Runtime};
use err_trail::ErrContext;
use iggy_common::{
- ConsumerKind, IggyByteSize, IggyError,
+ IggyByteSize, IggyError,
defaults::{
DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH,
MIN_PASSWORD_LENGTH,
MIN_USERNAME_LENGTH,
@@ -690,7 +694,7 @@ async fn load_partition(
config.get_consumer_group_offsets_path(stream_id, topic_id,
partition_id as usize);
let consumer_offset = Arc::new(
- load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
+ load_consumer_offsets(&consumer_offset_path)?
.into_iter()
.map(|offset| (offset.consumer_id as usize, offset))
.collect::<HashMap<usize, ConsumerOffset>>()
@@ -698,10 +702,9 @@ async fn load_partition(
);
let consumer_group_offset = Arc::new(
- load_consumer_offsets(&consumer_group_offsets_path,
ConsumerKind::ConsumerGroup)?
+ load_consumer_group_offsets(&consumer_group_offsets_path)?
.into_iter()
- .map(|offset| (offset.consumer_id as usize, offset))
- .collect::<HashMap<usize, ConsumerOffset>>()
+ .collect::<HashMap<ConsumerGroupId, ConsumerOffset>>()
.into(),
);
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index f3f239ff9..07fb78d92 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -128,12 +128,14 @@ impl IggyShard {
partition_id,
partitions::helpers::get_consumer_offset(id),
),
- PollingConsumer::ConsumerGroup(_, id) =>
self.streams.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- partitions::helpers::get_consumer_group_member_offset(id),
- ),
+ PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
+ self.streams.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+
partitions::helpers::get_consumer_group_offset(consumer_group_id),
+ )
+ }
};
Ok(offset)
}
@@ -216,13 +218,13 @@ impl IggyShard {
),
);
}
- PollingConsumer::ConsumerGroup(_, id) => {
+ PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
self.streams.with_partition_by_id(
stream_id,
topic_id,
partition_id,
- partitions::helpers::store_consumer_group_member_offset(
- *id,
+ partitions::helpers::store_consumer_group_offset(
+ *consumer_group_id,
stream_id_num,
topic_id_num,
partition_id,
@@ -250,11 +252,11 @@ impl IggyShard {
)
})
}
- PollingConsumer::ConsumerGroup(_, id) => {
+ PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
self.streams
- .with_partition_by_id(stream_id, topic_id, partition_id,
partitions::helpers::delete_consumer_group_member_offset(*id)).with_error(|error|
{
+ .with_partition_by_id(stream_id, topic_id, partition_id,
partitions::helpers::delete_consumer_group_offset(*consumer_group_id)).with_error(|error|
{
format!(
- "{COMPONENT} (error: {error}) - failed to delete
consumer group member offset for member with ID: {id} in topic with ID:
{topic_id} and stream with ID: {stream_id}",
+ "{COMPONENT} (error: {error}) - failed to delete
consumer group offset for group with ID: {consumer_group_id:?} in topic with
ID: {topic_id} and stream with ID: {stream_id}",
)
})
}
@@ -286,19 +288,20 @@ impl IggyShard {
);
partitions::storage::persist_offset(&path, offset_value).await
}
- PollingConsumer::ConsumerGroup(_, id) => {
+ PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
let (offset_value, path) = self.streams.with_partition_by_id(
stream_id,
topic_id,
partition_id,
- |(.., offsets, _)| {
+ move |(.., offsets, _)| {
let hdl = offsets.pin();
- let item = hdl.get(id).expect(
- "persist_consumer_group_member_offset_to_disk:
offset not found",
- );
- let offset =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
- let path = item.path.clone();
- (offset, path)
+ let item = hdl
+ .get(consumer_group_id)
+ .expect("persist_consumer_offset_to_disk: offset
not found");
+ (
+
item.offset.load(std::sync::atomic::Ordering::Relaxed),
+ item.path.clone(),
+ )
},
);
partitions::storage::persist_offset(&path, offset_value).await
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 907806f66..12176bd1c 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -17,6 +17,7 @@
use crate::shard::task_registry::TaskRegistry;
use crate::streaming::partitions as streaming_partitions;
+use crate::streaming::partitions::consumer_offset::ConsumerOffset;
use crate::streaming::stats::StreamStats;
use crate::{
binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
@@ -366,27 +367,25 @@ impl MainOps for Streams {
Ok(batches)
}
PollingKind::Next => {
- let (consumer_offset, consumer_id) = match consumer {
- PollingConsumer::Consumer(consumer_id, _) => (
- self.with_partition_by_id(
+ let consumer_offset = match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => self
+ .with_partition_by_id(
stream_id,
topic_id,
partition_id,
streaming_partitions::helpers::get_consumer_offset(consumer_id),
)
.map(|c_offset| c_offset.stored_offset),
- consumer_id,
- ),
- PollingConsumer::ConsumerGroup(cg_id, _) => (
- self.with_partition_by_id(
+ PollingConsumer::ConsumerGroup(consumer_group_id, _) =>
self
+ .with_partition_by_id(
stream_id,
topic_id,
partition_id,
-
streaming_partitions::helpers::get_consumer_group_member_offset(cg_id),
+
streaming_partitions::helpers::get_consumer_group_offset(
+ consumer_group_id,
+ ),
)
.map(|cg_offset| cg_offset.stored_offset),
- cg_id,
- ),
};
if consumer_offset.is_none() {
@@ -397,12 +396,25 @@ impl MainOps for Streams {
} else {
let consumer_offset = consumer_offset.unwrap();
let offset = consumer_offset + 1;
- tracing::trace!(
- "Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
- consumer_id,
- partition_id,
- offset
- );
+ match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => {
+ tracing::trace!(
+ "Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
+ consumer_id,
+ partition_id,
+ offset
+ );
+ }
+ PollingConsumer::ConsumerGroup(consumer_group_id,
member_id) => {
+ tracing::trace!(
+ "Getting next messages for consumer group: {}
member: {} for partition: {} from offset: {}...",
+ consumer_group_id.0,
+ member_id.0,
+ partition_id,
+ offset
+ );
+ }
+ }
let batches = self
.get_messages_by_offset(stream_id, topic_id,
partition_id, offset, count)
.await?;
@@ -1406,6 +1418,14 @@ impl Streams {
match consumer {
PollingConsumer::Consumer(consumer_id, _) => {
+ tracing::trace!(
+ "Auto-committing offset {} for consumer {} on stream {},
topic {}, partition {}",
+ offset,
+ consumer_id,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id
+ );
let (offset_value, path) = self.with_partition_by_id(
stream_id,
topic_id,
@@ -1427,7 +1447,15 @@ impl Streams {
);
crate::streaming::partitions::storage::persist_offset(&path,
offset_value).await?;
}
- PollingConsumer::ConsumerGroup(cg_id, _) => {
+ PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
+ tracing::trace!(
+ "Auto-committing offset {} for consumer group {} on stream
{}, topic {}, partition {}",
+ offset,
+ consumer_group_id.0,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id
+ );
let (offset_value, path) = self.with_partition_by_id(
stream_id,
topic_id,
@@ -1435,10 +1463,14 @@ impl Streams {
|(.., offsets, _)| {
let hdl = offsets.pin();
let item = hdl.get_or_insert(
- cg_id,
-
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group(
- cg_id as u32,
-
&config.get_consumer_group_offsets_path(numeric_stream_id, numeric_topic_id,
partition_id),
+ consumer_group_id,
+ ConsumerOffset::default_for_consumer_group(
+ consumer_group_id,
+ &config.get_consumer_group_offsets_path(
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ ),
),
);
item.offset.store(offset, Ordering::Relaxed);
diff --git a/core/server/src/streaming/partitions/consumer_offset.rs
b/core/server/src/streaming/partitions/consumer_offset.rs
index 4f88e9906..c081cc0a2 100644
--- a/core/server/src/streaming/partitions/consumer_offset.rs
+++ b/core/server/src/streaming/partitions/consumer_offset.rs
@@ -17,6 +17,7 @@
use std::sync::atomic::AtomicU64;
+use crate::streaming::polling_consumer::ConsumerGroupId;
use iggy_common::ConsumerKind;
#[derive(Debug)]
@@ -48,12 +49,12 @@ impl ConsumerOffset {
}
}
- pub fn default_for_consumer_group(consumer_id: u32, path: &str) -> Self {
+ pub fn default_for_consumer_group(consumer_group_id: ConsumerGroupId,
path: &str) -> Self {
Self {
kind: ConsumerKind::ConsumerGroup,
- consumer_id,
+ consumer_id: consumer_group_id.0 as u32,
offset: AtomicU64::new(0),
- path: format!("{path}/{consumer_id}"),
+ path: format!("{path}/{}", consumer_group_id.0),
}
}
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index 549156eee..1f2c85633 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -38,6 +38,7 @@ use crate::{
partition::{self, PartitionRef, PartitionRefMut},
storage,
},
+ polling_consumer::ConsumerGroupId,
segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet,
storage::Storage},
},
};
@@ -136,15 +137,18 @@ pub fn get_consumer_offset(
}
}
-pub fn get_consumer_group_member_offset(
- id: usize,
+pub fn get_consumer_group_offset(
+ consumer_group_id: ConsumerGroupId,
) -> impl FnOnce(ComponentsById<PartitionRef>) -> Option<ConsumerOffsetInfo> {
move |(root, _, _, current_offset, _, offsets, _)| {
- offsets.pin().get(&id).map(|item| ConsumerOffsetInfo {
- partition_id: root.id() as u32,
- current_offset: current_offset.load(Ordering::Relaxed),
- stored_offset: item.offset.load(Ordering::Relaxed),
- })
+ offsets
+ .pin()
+ .get(&consumer_group_id)
+ .map(|item| ConsumerOffsetInfo {
+ partition_id: root.id() as u32,
+ current_offset: current_offset.load(Ordering::Relaxed),
+ stored_offset: item.offset.load(Ordering::Relaxed),
+ })
}
}
@@ -207,8 +211,8 @@ pub fn delete_consumer_offset_from_disk(
}
}
-pub fn store_consumer_group_member_offset(
- id: usize,
+pub fn store_consumer_group_offset(
+ consumer_group_id: ConsumerGroupId,
stream_id: usize,
topic_id: usize,
partition_id: usize,
@@ -218,9 +222,9 @@ pub fn store_consumer_group_member_offset(
move |(.., offsets, _)| {
let hdl = offsets.pin();
let item = hdl.get_or_insert(
- id,
+ consumer_group_id,
ConsumerOffset::default_for_consumer_group(
- id as u32,
+ consumer_group_id,
&config.get_consumer_group_offsets_path(stream_id, topic_id,
partition_id),
),
);
@@ -228,39 +232,39 @@ pub fn store_consumer_group_member_offset(
}
}
-pub fn delete_consumer_group_member_offset(
- id: usize,
+pub fn delete_consumer_group_offset(
+ consumer_group_id: ConsumerGroupId,
) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<String, IggyError> {
move |(.., offsets, _)| {
let hdl = offsets.pin();
let offset = hdl
- .remove(&id)
- .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))?;
+ .remove(&consumer_group_id)
+ .ok_or_else(||
IggyError::ConsumerOffsetNotFound(consumer_group_id.0))?;
Ok(offset.path.clone())
}
}
-pub fn persist_consumer_group_member_offset_to_disk(
- id: usize,
+pub fn persist_consumer_group_offset_to_disk(
+ consumer_group_id: ConsumerGroupId,
) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> {
async move |(.., offsets, _)| {
let hdl = offsets.pin();
let item = hdl
- .get(&id)
- .expect("persist_consumer_group_member_offset_to_disk: offset not
found");
+ .get(&consumer_group_id)
+ .expect("persist_consumer_group_offset_to_disk: offset not found");
let offset = item.offset.load(Ordering::Relaxed);
storage::persist_offset(&item.path, offset).await
}
}
-pub fn delete_consumer_group_member_offset_from_disk(
- id: usize,
+pub fn delete_consumer_group_offset_from_disk(
+ consumer_group_id: ConsumerGroupId,
) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> {
async move |(.., offsets, _)| {
let hdl = offsets.pin();
let item = hdl
- .get(&id)
- .expect("delete_consumer_group_member_offset_from_disk: offset not
found");
+ .get(&consumer_group_id)
+ .expect("delete_consumer_group_offset_from_disk: offset not
found");
let path = &item.path;
storage::delete_persisted_offset(path).await
}
diff --git a/core/server/src/streaming/partitions/partition.rs
b/core/server/src/streaming/partitions/partition.rs
index 4dc42f0d2..7a07b8f33 100644
--- a/core/server/src/streaming/partitions/partition.rs
+++ b/core/server/src/streaming/partitions/partition.rs
@@ -29,6 +29,7 @@ use crate::{
consumer_offset, helpers::create_message_deduplicator,
journal::MemoryMessageJournal,
log::SegmentedLog,
},
+ polling_consumer::ConsumerGroupId,
stats::{PartitionStats, TopicStats},
},
};
@@ -69,7 +70,7 @@ impl std::ops::DerefMut for ConsumerOffsets {
}
#[derive(Debug, Clone)]
-pub struct ConsumerGroupOffsets(papaya::HashMap<usize,
consumer_offset::ConsumerOffset>);
+pub struct ConsumerGroupOffsets(papaya::HashMap<ConsumerGroupId,
consumer_offset::ConsumerOffset>);
impl ConsumerGroupOffsets {
pub fn with_capacity(capacity: usize) -> Self {
@@ -79,7 +80,7 @@ impl ConsumerGroupOffsets {
impl<I> From<I> for ConsumerGroupOffsets
where
- I: IntoIterator<Item = (usize, consumer_offset::ConsumerOffset)>,
+ I: IntoIterator<Item = (ConsumerGroupId, consumer_offset::ConsumerOffset)>,
{
fn from(iter: I) -> Self {
Self(papaya::HashMap::from_iter(iter))
@@ -87,7 +88,7 @@ where
}
impl std::ops::Deref for ConsumerGroupOffsets {
- type Target = papaya::HashMap<usize, consumer_offset::ConsumerOffset>;
+ type Target = papaya::HashMap<ConsumerGroupId,
consumer_offset::ConsumerOffset>;
fn deref(&self) -> &Self::Target {
&self.0
diff --git a/core/server/src/streaming/partitions/storage.rs
b/core/server/src/streaming/partitions/storage.rs
index 9857fa860..9899bb9aa 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -19,6 +19,7 @@ use super::COMPONENT;
use crate::{
configs::system::SystemConfig, io::fs_utils::remove_dir_all,
streaming::partitions::consumer_offset::ConsumerOffset,
+ streaming::polling_consumer::ConsumerGroupId,
};
use compio::{
fs::{self, OpenOptions, create_dir_all},
@@ -159,10 +160,7 @@ pub async fn persist_offset(path: &str, offset: u64) ->
Result<(), IggyError> {
Ok(())
}
-pub fn load_consumer_offsets(
- path: &str,
- kind: ConsumerKind,
-) -> Result<Vec<ConsumerOffset>, IggyError> {
+pub fn load_consumer_offsets(path: &str) -> Result<Vec<ConsumerOffset>,
IggyError> {
trace!("Loading consumer offsets from path: {path}...");
let dir_entries = std::fs::read_dir(path);
if dir_entries.is_err() {
@@ -183,11 +181,9 @@ pub fn load_consumer_offsets(
}
let name = dir_entry.file_name().into_string().unwrap();
- let consumer_id = name.parse::<u32>();
- if consumer_id.is_err() {
- error!("Invalid consumer ID file with name: '{}'.", name);
- continue;
- }
+ let consumer_id = name.parse::<u32>().unwrap_or_else(|_| {
+ panic!("Invalid consumer ID file with name: '{}'.", name);
+ });
let path = dir_entry.path();
let path = path.to_str();
@@ -197,7 +193,6 @@ pub fn load_consumer_offsets(
}
let path = path.unwrap().to_string();
- let consumer_id = consumer_id.unwrap();
let file = std::fs::File::open(&path)
.with_error(|error| {
format!("{COMPONENT} (error: {error}) - failed to open offset
file, path: {path}")
@@ -214,7 +209,7 @@ pub fn load_consumer_offsets(
let offset = AtomicU64::new(u64::from_le_bytes(offset));
consumer_offsets.push(ConsumerOffset {
- kind,
+ kind: ConsumerKind::Consumer,
consumer_id,
offset,
path,
@@ -224,3 +219,74 @@ pub fn load_consumer_offsets(
consumer_offsets.sort_by(|a, b| a.consumer_id.cmp(&b.consumer_id));
Ok(consumer_offsets)
}
+
+pub fn load_consumer_group_offsets(
+ path: &str,
+) -> Result<Vec<(ConsumerGroupId, ConsumerOffset)>, IggyError> {
+ trace!("Loading consumer group offsets from path: {path}...");
+ let dir_entries = std::fs::read_dir(path);
+ if dir_entries.is_err() {
+ return Err(IggyError::CannotReadConsumerOffsets(path.to_owned()));
+ }
+
+ let mut consumer_group_offsets = Vec::new();
+ let dir_entries = dir_entries.unwrap();
+ for dir_entry in dir_entries {
+ let dir_entry = dir_entry.unwrap();
+ let metadata = dir_entry.metadata();
+ if metadata.is_err() {
+ break;
+ }
+
+ if metadata.unwrap().is_dir() {
+ continue;
+ }
+
+ let name = dir_entry.file_name().into_string().unwrap();
+
+ let consumer_group_id = name.parse::<u32>().unwrap_or_else(|_| {
+ panic!(
+ "Invalid consumer group ID in consumer group file with name:
'{}'.",
+ name
+ );
+ });
+ let consumer_group_id = ConsumerGroupId(consumer_group_id as usize);
+
+ let path = dir_entry.path();
+ let path = path.to_str();
+ if path.is_none() {
+ error!(
+ "Invalid consumer group offset path for file with name: '{}'.",
+ name
+ );
+ continue;
+ }
+
+ let path = path.unwrap().to_string();
+ let file = std::fs::File::open(&path)
+ .with_error(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to open offset
file, path: {path}")
+ })
+ .map_err(|_| IggyError::CannotReadFile)?;
+ let mut cursor = std::io::Cursor::new(file);
+ let mut offset = [0; 8];
+ cursor
+ .get_mut().read_exact(&mut offset)
+ .with_error(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to read
consumer group offset from file, path: {path}")
+ })
+ .map_err(|_| IggyError::CannotReadFile)?;
+ let offset = AtomicU64::new(u64::from_le_bytes(offset));
+
+ let consumer_offset = ConsumerOffset {
+ kind: ConsumerKind::ConsumerGroup,
+ consumer_id: consumer_group_id.0 as u32,
+ offset,
+ path,
+ };
+
+ consumer_group_offsets.push((consumer_group_id, consumer_offset));
+ }
+
+ Ok(consumer_group_offsets)
+}
diff --git a/core/server/src/streaming/polling_consumer.rs
b/core/server/src/streaming/polling_consumer.rs
index 2f75f379e..dcc5f0f4a 100644
--- a/core/server/src/streaming/polling_consumer.rs
+++ b/core/server/src/streaming/polling_consumer.rs
@@ -20,10 +20,28 @@ use crate::streaming::utils::hash;
use iggy_common::{IdKind, Identifier};
use std::fmt::{Display, Formatter};
+#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
+pub struct ConsumerGroupId(pub usize);
+
+impl Display for ConsumerGroupId {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.0)
+ }
+}
+
+#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
+pub struct MemberId(pub usize);
+
+impl Display for MemberId {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.0)
+ }
+}
+
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum PollingConsumer {
- Consumer(usize, usize), // Consumer ID + Partition ID
- ConsumerGroup(usize, usize), // Consumer Group ID + Member ID
+ Consumer(usize, usize), // Consumer ID + Partition ID
+ ConsumerGroup(ConsumerGroupId, MemberId), // Consumer Group ID + Member ID
}
impl PollingConsumer {
@@ -32,7 +50,7 @@ impl PollingConsumer {
}
pub fn consumer_group(consumer_group_id: usize, member_id: usize) -> Self {
- PollingConsumer::ConsumerGroup(consumer_group_id, member_id)
+ PollingConsumer::ConsumerGroup(ConsumerGroupId(consumer_group_id),
MemberId(member_id))
}
pub fn resolve_consumer_id(identifier: &Identifier) -> usize {
@@ -103,8 +121,8 @@ mod tests {
match polling_consumer {
PollingConsumer::ConsumerGroup(consumer_group_id, member_id) => {
- assert_eq!(consumer_group_id, group_id);
- assert_eq!(member_id, client_id);
+ assert_eq!(consumer_group_id, ConsumerGroupId(group_id));
+ assert_eq!(member_id, MemberId(client_id));
}
_ => panic!("Expected ConsumerGroup"),
}
diff --git a/core/server/src/streaming/streams/helpers.rs
b/core/server/src/streaming/streams/helpers.rs
index ad146bb63..1ffad1227 100644
--- a/core/server/src/streaming/streams/helpers.rs
+++ b/core/server/src/streaming/streams/helpers.rs
@@ -80,8 +80,8 @@ pub fn store_consumer_offset(
}
}
-pub fn store_consumer_group_member_offset(
- member_id: usize,
+pub fn store_consumer_group_offset(
+ consumer_group_id: crate::streaming::polling_consumer::ConsumerGroupId,
topic_id: &Identifier,
partition_id: usize,
offset: u64,
@@ -93,8 +93,8 @@ pub fn store_consumer_group_member_offset(
let topic_id = root.id();
root.partitions().with_components_by_id(
partition_id,
- partitions::helpers::store_consumer_group_member_offset(
- member_id,
+ partitions::helpers::store_consumer_group_offset(
+ consumer_group_id,
stream_id,
topic_id,
partition_id,
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 08eff1198..40b9f7c8e 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -27,7 +27,7 @@ repository = "https://github.com/apache/iggy"
[dependencies]
bytes = "1.10.1"
-iggy = { path = "../../core/sdk", version = "0.8.0-edge.3" }
+iggy = { path = "../../core/sdk", version = "0.8.0-edge.4" }
pyo3 = "0.26.0"
pyo3-async-runtimes = { version = "0.26.0", features = [
"attributes",