This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f7db1281f fix(server): fix store_consumer_offset for consumer groups 
(#2351)
f7db1281f is described below

commit f7db1281fed9146616474608714bcc36dadf143f
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sat Nov 15 17:27:13 2025 +0100

    fix(server): fix store_consumer_offset for consumer groups (#2351)
    
    This PR addresses the problem of storing consumer offset for consumer group 
members,
    by storing ConsumerGroupId instead of MemberId in the consumer offset 
storage for CG members.
---
 Cargo.lock                                         |   6 +-
 Cargo.toml                                         |   6 +-
 DEPENDENCIES.md                                    |   6 +-
 core/binary_protocol/Cargo.toml                    |   2 +-
 core/common/Cargo.toml                             |   2 +-
 core/integration/tests/server/cg.rs                |   4 +-
 core/integration/tests/server/mod.rs               |  10 +-
 ...umer_group_auto_commit_reconnection_scenario.rs | 201 +++++++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 core/sdk/Cargo.toml                                |   2 +-
 core/server/src/bootstrap.rs                       |  19 +-
 core/server/src/shard/system/consumer_offsets.rs   |  43 +++--
 core/server/src/slab/streams.rs                    |  74 +++++---
 .../src/streaming/partitions/consumer_offset.rs    |   7 +-
 core/server/src/streaming/partitions/helpers.rs    |  50 ++---
 core/server/src/streaming/partitions/partition.rs  |   7 +-
 core/server/src/streaming/partitions/storage.rs    |  88 +++++++--
 core/server/src/streaming/polling_consumer.rs      |  28 ++-
 core/server/src/streaming/streams/helpers.rs       |   8 +-
 foreign/python/Cargo.toml                          |   2 +-
 20 files changed, 453 insertions(+), 113 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 9b2474380..5082753d4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4153,7 +4153,7 @@ dependencies = [
 
 [[package]]
 name = "iggy"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
 dependencies = [
  "async-broadcast",
  "async-dropper",
@@ -4316,7 +4316,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_binary_protocol"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
 dependencies = [
  "anyhow",
  "async-broadcast",
@@ -4337,7 +4337,7 @@ dependencies = [
 
 [[package]]
 name = "iggy_common"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
 dependencies = [
  "aes-gcm",
  "ahash 0.8.12",
diff --git a/Cargo.toml b/Cargo.toml
index 7fb7a4d5a..243e1729b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -119,9 +119,9 @@ futures = "0.3.31"
 futures-util = "0.3.31"
 human-repr = "1.1.0"
 humantime = "2.3.0"
-iggy = { path = "core/sdk", version = "0.8.0-edge.3" }
-iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.8.0-edge.3" }
-iggy_common = { path = "core/common", version = "0.8.0-edge.3" }
+iggy = { path = "core/sdk", version = "0.8.0-edge.4" }
+iggy_binary_protocol = { path = "core/binary_protocol", version = 
"0.8.0-edge.4" }
+iggy_common = { path = "core/common", version = "0.8.0-edge.4" }
 iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.0" }
 integration = { path = "core/integration" }
 keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index c3be25088..3498c0c30 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -368,14 +368,14 @@ icu_provider: 2.1.1, "Unicode-3.0",
 ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.0-edge.3, "Apache-2.0",
+iggy: 0.8.0-edge.4, "Apache-2.0",
 iggy-bench: 0.3.0-edge.3, "Apache-2.0",
 iggy-bench-dashboard-server: 0.5.0-edge.3, "Apache-2.0",
 iggy-cli: 0.10.0-edge.3, "Apache-2.0",
 iggy-connectors: 0.2.0-edge.3, "Apache-2.0",
 iggy-mcp: 0.2.0-edge.3, "Apache-2.0",
-iggy_binary_protocol: 0.8.0-edge.3, "Apache-2.0",
-iggy_common: 0.8.0-edge.3, "Apache-2.0",
+iggy_binary_protocol: 0.8.0-edge.4, "Apache-2.0",
+iggy_common: 0.8.0-edge.4, "Apache-2.0",
 iggy_connector_postgres_sink: 0.1.0, "Apache-2.0",
 iggy_connector_postgres_source: 0.1.0, "Apache-2.0",
 iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0",
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index 9bf5cc0da..034fe4399 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy_binary_protocol"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index af9d9f3b9..ab276b346 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 [package]
 name = "iggy_common"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/integration/tests/server/cg.rs 
b/core/integration/tests/server/cg.rs
index 7588eb1d9..b97de88d5 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use crate::server::{
-    ScenarioFn, join_scenario, multiple_clients_scenario, run_scenario, 
single_client_scenario,
+    ScenarioFn, auto_commit_reconnection_scenario, join_scenario, 
multiple_clients_scenario,
+    run_scenario, single_client_scenario,
 };
 use iggy_common::TransportProtocol;
 use serial_test::parallel;
@@ -30,6 +31,7 @@ use test_case::test_matrix;
         join_scenario(),
         single_client_scenario(),
         multiple_clients_scenario(),
+        auto_commit_reconnection_scenario(),
     ]
 )]
 #[tokio::test]
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index 5200fc53f..e0b9cd025 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -31,7 +31,7 @@ use integration::{
     websocket_client::WebSocketClientFactory,
 };
 use scenarios::{
-    bench_scenario, consumer_group_join_scenario,
+    bench_scenario, consumer_group_auto_commit_reconnection_scenario, 
consumer_group_join_scenario,
     consumer_group_with_multiple_clients_polling_messages_scenario,
     consumer_group_with_single_client_polling_messages_scenario, 
create_message_payload,
     message_headers_scenario, stream_size_validation_scenario, 
system_scenario, user_scenario,
@@ -73,6 +73,14 @@ fn multiple_clients_scenario() -> ScenarioFn {
     |factory| 
Box::pin(consumer_group_with_multiple_clients_polling_messages_scenario::run(factory))
 }
 
+fn auto_commit_reconnection_scenario() -> ScenarioFn {
+    |factory| {
+        Box::pin(consumer_group_auto_commit_reconnection_scenario::run(
+            factory,
+        ))
+    }
+}
+
 fn bench_scenario() -> ScenarioFn {
     |factory| Box::pin(bench_scenario::run(factory))
 }
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_auto_commit_reconnection_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_auto_commit_reconnection_scenario.rs
new file mode 100644
index 000000000..485f02e35
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/consumer_group_auto_commit_reconnection_scenario.rs
@@ -0,0 +1,201 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{
+    CONSUMER_GROUP_NAME, PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client,
+};
+use futures::StreamExt;
+use iggy::prelude::*;
+use iggy_common::ConsumerOffsetInfo;
+use integration::test_server::{ClientFactory, login_root};
+use std::str::FromStr;
+use tokio::time::{Duration, sleep};
+
+const TEST_MESSAGES_COUNT: u32 = 100;
+const HALF_MESSAGES_COUNT: u32 = TEST_MESSAGES_COUNT / 2;
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let client = create_client(client_factory).await;
+    login_root(&client).await;
+    init_system(&client).await;
+    execute_auto_commit_reconnection_scenario(&client).await;
+}
+
+async fn init_system(client: &IggyClient) {
+    // 1. Create the stream
+    client.create_stream(STREAM_NAME).await.unwrap();
+
+    // 2. Create the topic
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    // 3. Create the consumer group
+    client
+        .create_consumer_group(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            CONSUMER_GROUP_NAME,
+        )
+        .await
+        .unwrap();
+}
+
+async fn execute_auto_commit_reconnection_scenario(client: &IggyClient) {
+    // Step 1: Produce messages to exactly the same stream/topic/partition
+    produce_messages_to_partition(client).await;
+
+    // Step 2: Create a consumer group with singular consumer that uses 
auto_commit
+    let mut consumer = create_auto_commit_consumer(client).await;
+
+    // Step 3: Consume 50% of the produced messages
+    let consumed_messages = consume_half_messages(&mut consumer).await;
+    assert_eq!(consumed_messages.len(), HALF_MESSAGES_COUNT as usize);
+
+    // Wait a bit to ensure auto-commit has processed
+    sleep(Duration::from_secs(2)).await;
+
+    // Step 4: Disconnect the consumer
+    drop(consumer);
+
+    // Step 5: Check if the committed offset is at the place where it should 
be (halfway through)
+    let committed_offset_after_half = get_committed_offset(client).await;
+    assert!(committed_offset_after_half.is_some());
+    let offset_info = committed_offset_after_half.unwrap();
+    // The offset should be around half of the messages (offset is 0-based)
+    assert_eq!(offset_info.stored_offset, (HALF_MESSAGES_COUNT - 1) as u64);
+
+    // Step 6: Reconnect the consumer
+    let mut consumer = create_auto_commit_consumer(client).await;
+
+    // Step 7: Consume rest of the messages
+    let remaining_messages = consume_remaining_messages(&mut consumer).await;
+    assert_eq!(remaining_messages.len(), HALF_MESSAGES_COUNT as usize);
+
+    // Wait a bit to ensure auto-commit has processed
+    sleep(Duration::from_secs(2)).await;
+
+    // Step 8: Check if the committed offset is at the end
+    let committed_offset_final = get_committed_offset(client).await;
+    assert!(committed_offset_final.is_some());
+    let final_offset_info = committed_offset_final.unwrap();
+    // The offset should be at the last message (offset is 0-based)
+    assert_eq!(
+        final_offset_info.stored_offset,
+        (TEST_MESSAGES_COUNT - 1) as u64
+    );
+
+    drop(consumer);
+}
+
+async fn produce_messages_to_partition(client: &IggyClient) {
+    let mut messages = Vec::new();
+    for message_id in 1..=TEST_MESSAGES_COUNT {
+        let payload = format!("test_message_{}", message_id);
+        let message = IggyMessage::from_str(&payload).unwrap();
+        messages.push(message);
+    }
+
+    client
+        .send_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Partitioning::partition_id(PARTITION_ID),
+            &mut messages,
+        )
+        .await
+        .unwrap();
+}
+
+async fn create_auto_commit_consumer(client: &IggyClient) -> IggyConsumer {
+    let mut consumer = client
+        .consumer_group(CONSUMER_GROUP_NAME, STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .batch_length(10)
+        .poll_interval(IggyDuration::from_str("100ms").expect("Invalid 
duration"))
+        .polling_strategy(PollingStrategy::next())
+        .auto_join_consumer_group()
+        .auto_commit(AutoCommit::IntervalOrAfter(
+            IggyDuration::from_str("1s").unwrap(),
+            AutoCommitAfter::ConsumingEachMessage,
+        ))
+        .build();
+
+    consumer.init().await.unwrap();
+    consumer
+}
+
+async fn consume_half_messages(consumer: &mut IggyConsumer) -> 
Vec<IggyMessage> {
+    let mut consumed_messages = Vec::new();
+    let mut count = 0;
+
+    while count < HALF_MESSAGES_COUNT {
+        if let Some(message_result) = consumer.next().await {
+            match message_result {
+                Ok(polled_message) => {
+                    consumed_messages.push(polled_message.message);
+                    count += 1;
+                }
+                Err(e) => panic!("Error while consuming messages: {}", e),
+            }
+        }
+    }
+
+    consumed_messages
+}
+
+async fn consume_remaining_messages(consumer: &mut IggyConsumer) -> 
Vec<IggyMessage> {
+    let mut consumed_messages = Vec::new();
+    let mut count = 0;
+
+    while count < HALF_MESSAGES_COUNT {
+        if let Some(message_result) = consumer.next().await {
+            match message_result {
+                Ok(polled_message) => {
+                    consumed_messages.push(polled_message.message);
+                    count += 1;
+                }
+                Err(e) => panic!("Error while consuming remaining messages: 
{}", e),
+            }
+        }
+    }
+
+    consumed_messages
+}
+
+async fn get_committed_offset(client: &IggyClient) -> 
Option<ConsumerOffsetInfo> {
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+    client
+        .get_consumer_offset(
+            &consumer,
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+        )
+        .await
+        .unwrap()
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 05f46901d..e1ffe9c47 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -18,6 +18,7 @@
 
 pub mod bench_scenario;
 pub mod concurrent_scenario;
+pub mod consumer_group_auto_commit_reconnection_scenario;
 pub mod consumer_group_join_scenario;
 pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
 pub mod consumer_group_with_single_client_polling_messages_scenario;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 7b228ce65..675177914 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.8.0-edge.3"
+version = "0.8.0-edge.4"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index b5ba07fce..f7cec4668 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -43,12 +43,16 @@ use crate::{
     state::system::{StreamState, TopicState, UserState},
     streaming::{
         partitions::{
-            consumer_offset::ConsumerOffset, 
helpers::create_message_deduplicator,
-            journal::MemoryMessageJournal, log::SegmentedLog, partition,
-            storage::load_consumer_offsets,
+            consumer_offset::ConsumerOffset,
+            helpers::create_message_deduplicator,
+            journal::MemoryMessageJournal,
+            log::SegmentedLog,
+            partition,
+            storage::{load_consumer_group_offsets, load_consumer_offsets},
         },
         persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
         personal_access_tokens::personal_access_token::PersonalAccessToken,
+        polling_consumer::ConsumerGroupId,
         segments::{Segment, storage::Storage},
         stats::{PartitionStats, StreamStats, TopicStats},
         storage::SystemStorage,
@@ -63,7 +67,7 @@ use ahash::HashMap;
 use compio::{fs::create_dir_all, runtime::Runtime};
 use err_trail::ErrContext;
 use iggy_common::{
-    ConsumerKind, IggyByteSize, IggyError,
+    IggyByteSize, IggyError,
     defaults::{
         DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, 
MIN_PASSWORD_LENGTH,
         MIN_USERNAME_LENGTH,
@@ -690,7 +694,7 @@ async fn load_partition(
         config.get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id as usize);
 
     let consumer_offset = Arc::new(
-        load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
+        load_consumer_offsets(&consumer_offset_path)?
             .into_iter()
             .map(|offset| (offset.consumer_id as usize, offset))
             .collect::<HashMap<usize, ConsumerOffset>>()
@@ -698,10 +702,9 @@ async fn load_partition(
     );
 
     let consumer_group_offset = Arc::new(
-        load_consumer_offsets(&consumer_group_offsets_path, 
ConsumerKind::ConsumerGroup)?
+        load_consumer_group_offsets(&consumer_group_offsets_path)?
             .into_iter()
-            .map(|offset| (offset.consumer_id as usize, offset))
-            .collect::<HashMap<usize, ConsumerOffset>>()
+            .collect::<HashMap<ConsumerGroupId, ConsumerOffset>>()
             .into(),
     );
 
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index f3f239ff9..07fb78d92 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -128,12 +128,14 @@ impl IggyShard {
                 partition_id,
                 partitions::helpers::get_consumer_offset(id),
             ),
-            PollingConsumer::ConsumerGroup(_, id) => 
self.streams.with_partition_by_id(
-                stream_id,
-                topic_id,
-                partition_id,
-                partitions::helpers::get_consumer_group_member_offset(id),
-            ),
+            PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
+                self.streams.with_partition_by_id(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    
partitions::helpers::get_consumer_group_offset(consumer_group_id),
+                )
+            }
         };
         Ok(offset)
     }
@@ -216,13 +218,13 @@ impl IggyShard {
                     ),
                 );
             }
-            PollingConsumer::ConsumerGroup(_, id) => {
+            PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
                 self.streams.with_partition_by_id(
                     stream_id,
                     topic_id,
                     partition_id,
-                    partitions::helpers::store_consumer_group_member_offset(
-                        *id,
+                    partitions::helpers::store_consumer_group_offset(
+                        *consumer_group_id,
                         stream_id_num,
                         topic_id_num,
                         partition_id,
@@ -250,11 +252,11 @@ impl IggyShard {
                         )
                     })
             }
-            PollingConsumer::ConsumerGroup(_, id) => {
+            PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
                 self.streams
-                    .with_partition_by_id(stream_id, topic_id, partition_id, 
partitions::helpers::delete_consumer_group_member_offset(*id)).with_error(|error|
 {
+                    .with_partition_by_id(stream_id, topic_id, partition_id, 
partitions::helpers::delete_consumer_group_offset(*consumer_group_id)).with_error(|error|
 {
                         format!(
-                            "{COMPONENT} (error: {error}) - failed to delete 
consumer group member offset for member with ID: {id} in topic with ID: 
{topic_id} and stream with ID: {stream_id}",
+                            "{COMPONENT} (error: {error}) - failed to delete 
consumer group offset for group with ID: {consumer_group_id:?} in topic with 
ID: {topic_id} and stream with ID: {stream_id}",
                         )
                     })
             }
@@ -286,19 +288,20 @@ impl IggyShard {
                 );
                 partitions::storage::persist_offset(&path, offset_value).await
             }
-            PollingConsumer::ConsumerGroup(_, id) => {
+            PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
                 let (offset_value, path) = self.streams.with_partition_by_id(
                     stream_id,
                     topic_id,
                     partition_id,
-                    |(.., offsets, _)| {
+                    move |(.., offsets, _)| {
                         let hdl = offsets.pin();
-                        let item = hdl.get(id).expect(
-                            "persist_consumer_group_member_offset_to_disk: 
offset not found",
-                        );
-                        let offset = 
item.offset.load(std::sync::atomic::Ordering::Relaxed);
-                        let path = item.path.clone();
-                        (offset, path)
+                        let item = hdl
+                            .get(consumer_group_id)
+                            .expect("persist_consumer_offset_to_disk: offset 
not found");
+                        (
+                            
item.offset.load(std::sync::atomic::Ordering::Relaxed),
+                            item.path.clone(),
+                        )
                     },
                 );
                 partitions::storage::persist_offset(&path, offset_value).await
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 907806f66..12176bd1c 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -17,6 +17,7 @@
 
 use crate::shard::task_registry::TaskRegistry;
 use crate::streaming::partitions as streaming_partitions;
+use crate::streaming::partitions::consumer_offset::ConsumerOffset;
 use crate::streaming::stats::StreamStats;
 use crate::{
     binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
@@ -366,27 +367,25 @@ impl MainOps for Streams {
                 Ok(batches)
             }
             PollingKind::Next => {
-                let (consumer_offset, consumer_id) = match consumer {
-                    PollingConsumer::Consumer(consumer_id, _) => (
-                        self.with_partition_by_id(
+                let consumer_offset = match consumer {
+                    PollingConsumer::Consumer(consumer_id, _) => self
+                        .with_partition_by_id(
                             stream_id,
                             topic_id,
                             partition_id,
                             
streaming_partitions::helpers::get_consumer_offset(consumer_id),
                         )
                         .map(|c_offset| c_offset.stored_offset),
-                        consumer_id,
-                    ),
-                    PollingConsumer::ConsumerGroup(cg_id, _) => (
-                        self.with_partition_by_id(
+                    PollingConsumer::ConsumerGroup(consumer_group_id, _) => 
self
+                        .with_partition_by_id(
                             stream_id,
                             topic_id,
                             partition_id,
-                            
streaming_partitions::helpers::get_consumer_group_member_offset(cg_id),
+                            
streaming_partitions::helpers::get_consumer_group_offset(
+                                consumer_group_id,
+                            ),
                         )
                         .map(|cg_offset| cg_offset.stored_offset),
-                        cg_id,
-                    ),
                 };
 
                 if consumer_offset.is_none() {
@@ -397,12 +396,25 @@ impl MainOps for Streams {
                 } else {
                     let consumer_offset = consumer_offset.unwrap();
                     let offset = consumer_offset + 1;
-                    tracing::trace!(
-                        "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
-                        consumer_id,
-                        partition_id,
-                        offset
-                    );
+                    match consumer {
+                        PollingConsumer::Consumer(consumer_id, _) => {
+                            tracing::trace!(
+                                "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
+                                consumer_id,
+                                partition_id,
+                                offset
+                            );
+                        }
+                        PollingConsumer::ConsumerGroup(consumer_group_id, 
member_id) => {
+                            tracing::trace!(
+                                "Getting next messages for consumer group: {} 
member: {} for partition: {} from offset: {}...",
+                                consumer_group_id.0,
+                                member_id.0,
+                                partition_id,
+                                offset
+                            );
+                        }
+                    }
                     let batches = self
                         .get_messages_by_offset(stream_id, topic_id, 
partition_id, offset, count)
                         .await?;
@@ -1406,6 +1418,14 @@ impl Streams {
 
         match consumer {
             PollingConsumer::Consumer(consumer_id, _) => {
+                tracing::trace!(
+                    "Auto-committing offset {} for consumer {} on stream {}, 
topic {}, partition {}",
+                    offset,
+                    consumer_id,
+                    numeric_stream_id,
+                    numeric_topic_id,
+                    partition_id
+                );
                 let (offset_value, path) = self.with_partition_by_id(
                     stream_id,
                     topic_id,
@@ -1427,7 +1447,15 @@ impl Streams {
                 );
                 crate::streaming::partitions::storage::persist_offset(&path, 
offset_value).await?;
             }
-            PollingConsumer::ConsumerGroup(cg_id, _) => {
+            PollingConsumer::ConsumerGroup(consumer_group_id, _) => {
+                tracing::trace!(
+                    "Auto-committing offset {} for consumer group {} on stream 
{}, topic {}, partition {}",
+                    offset,
+                    consumer_group_id.0,
+                    numeric_stream_id,
+                    numeric_topic_id,
+                    partition_id
+                );
                 let (offset_value, path) = self.with_partition_by_id(
                     stream_id,
                     topic_id,
@@ -1435,10 +1463,14 @@ impl Streams {
                     |(.., offsets, _)| {
                         let hdl = offsets.pin();
                         let item = hdl.get_or_insert(
-                            cg_id,
-                            
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group(
-                                cg_id as u32,
-                                
&config.get_consumer_group_offsets_path(numeric_stream_id, numeric_topic_id, 
partition_id),
+                            consumer_group_id,
+                            ConsumerOffset::default_for_consumer_group(
+                                consumer_group_id,
+                                &config.get_consumer_group_offsets_path(
+                                    numeric_stream_id,
+                                    numeric_topic_id,
+                                    partition_id,
+                                ),
                             ),
                         );
                         item.offset.store(offset, Ordering::Relaxed);
diff --git a/core/server/src/streaming/partitions/consumer_offset.rs 
b/core/server/src/streaming/partitions/consumer_offset.rs
index 4f88e9906..c081cc0a2 100644
--- a/core/server/src/streaming/partitions/consumer_offset.rs
+++ b/core/server/src/streaming/partitions/consumer_offset.rs
@@ -17,6 +17,7 @@
 
 use std::sync::atomic::AtomicU64;
 
+use crate::streaming::polling_consumer::ConsumerGroupId;
 use iggy_common::ConsumerKind;
 
 #[derive(Debug)]
@@ -48,12 +49,12 @@ impl ConsumerOffset {
         }
     }
 
-    pub fn default_for_consumer_group(consumer_id: u32, path: &str) -> Self {
+    pub fn default_for_consumer_group(consumer_group_id: ConsumerGroupId, 
path: &str) -> Self {
         Self {
             kind: ConsumerKind::ConsumerGroup,
-            consumer_id,
+            consumer_id: consumer_group_id.0 as u32,
             offset: AtomicU64::new(0),
-            path: format!("{path}/{consumer_id}"),
+            path: format!("{path}/{}", consumer_group_id.0),
         }
     }
 
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 549156eee..1f2c85633 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -38,6 +38,7 @@ use crate::{
             partition::{self, PartitionRef, PartitionRefMut},
             storage,
         },
+        polling_consumer::ConsumerGroupId,
         segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
storage::Storage},
     },
 };
@@ -136,15 +137,18 @@ pub fn get_consumer_offset(
     }
 }
 
-pub fn get_consumer_group_member_offset(
-    id: usize,
+pub fn get_consumer_group_offset(
+    consumer_group_id: ConsumerGroupId,
 ) -> impl FnOnce(ComponentsById<PartitionRef>) -> Option<ConsumerOffsetInfo> {
     move |(root, _, _, current_offset, _, offsets, _)| {
-        offsets.pin().get(&id).map(|item| ConsumerOffsetInfo {
-            partition_id: root.id() as u32,
-            current_offset: current_offset.load(Ordering::Relaxed),
-            stored_offset: item.offset.load(Ordering::Relaxed),
-        })
+        offsets
+            .pin()
+            .get(&consumer_group_id)
+            .map(|item| ConsumerOffsetInfo {
+                partition_id: root.id() as u32,
+                current_offset: current_offset.load(Ordering::Relaxed),
+                stored_offset: item.offset.load(Ordering::Relaxed),
+            })
     }
 }
 
@@ -207,8 +211,8 @@ pub fn delete_consumer_offset_from_disk(
     }
 }
 
-pub fn store_consumer_group_member_offset(
-    id: usize,
+pub fn store_consumer_group_offset(
+    consumer_group_id: ConsumerGroupId,
     stream_id: usize,
     topic_id: usize,
     partition_id: usize,
@@ -218,9 +222,9 @@ pub fn store_consumer_group_member_offset(
     move |(.., offsets, _)| {
         let hdl = offsets.pin();
         let item = hdl.get_or_insert(
-            id,
+            consumer_group_id,
             ConsumerOffset::default_for_consumer_group(
-                id as u32,
+                consumer_group_id,
                 &config.get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id),
             ),
         );
@@ -228,39 +232,39 @@ pub fn store_consumer_group_member_offset(
     }
 }
 
-pub fn delete_consumer_group_member_offset(
-    id: usize,
+pub fn delete_consumer_group_offset(
+    consumer_group_id: ConsumerGroupId,
 ) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<String, IggyError> {
     move |(.., offsets, _)| {
         let hdl = offsets.pin();
         let offset = hdl
-            .remove(&id)
-            .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))?;
+            .remove(&consumer_group_id)
+            .ok_or_else(|| 
IggyError::ConsumerOffsetNotFound(consumer_group_id.0))?;
         Ok(offset.path.clone())
     }
 }
 
-pub fn persist_consumer_group_member_offset_to_disk(
-    id: usize,
+pub fn persist_consumer_group_offset_to_disk(
+    consumer_group_id: ConsumerGroupId,
 ) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> {
     async move |(.., offsets, _)| {
         let hdl = offsets.pin();
         let item = hdl
-            .get(&id)
-            .expect("persist_consumer_group_member_offset_to_disk: offset not 
found");
+            .get(&consumer_group_id)
+            .expect("persist_consumer_group_offset_to_disk: offset not found");
         let offset = item.offset.load(Ordering::Relaxed);
         storage::persist_offset(&item.path, offset).await
     }
 }
 
-pub fn delete_consumer_group_member_offset_from_disk(
-    id: usize,
+pub fn delete_consumer_group_offset_from_disk(
+    consumer_group_id: ConsumerGroupId,
 ) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> {
     async move |(.., offsets, _)| {
         let hdl = offsets.pin();
         let item = hdl
-            .get(&id)
-            .expect("delete_consumer_group_member_offset_from_disk: offset not 
found");
+            .get(&consumer_group_id)
+            .expect("delete_consumer_group_offset_from_disk: offset not 
found");
         let path = &item.path;
         storage::delete_persisted_offset(path).await
     }
diff --git a/core/server/src/streaming/partitions/partition.rs 
b/core/server/src/streaming/partitions/partition.rs
index 4dc42f0d2..7a07b8f33 100644
--- a/core/server/src/streaming/partitions/partition.rs
+++ b/core/server/src/streaming/partitions/partition.rs
@@ -29,6 +29,7 @@ use crate::{
             consumer_offset, helpers::create_message_deduplicator, 
journal::MemoryMessageJournal,
             log::SegmentedLog,
         },
+        polling_consumer::ConsumerGroupId,
         stats::{PartitionStats, TopicStats},
     },
 };
@@ -69,7 +70,7 @@ impl std::ops::DerefMut for ConsumerOffsets {
 }
 
 #[derive(Debug, Clone)]
-pub struct ConsumerGroupOffsets(papaya::HashMap<usize, 
consumer_offset::ConsumerOffset>);
+pub struct ConsumerGroupOffsets(papaya::HashMap<ConsumerGroupId, 
consumer_offset::ConsumerOffset>);
 
 impl ConsumerGroupOffsets {
     pub fn with_capacity(capacity: usize) -> Self {
@@ -79,7 +80,7 @@ impl ConsumerGroupOffsets {
 
 impl<I> From<I> for ConsumerGroupOffsets
 where
-    I: IntoIterator<Item = (usize, consumer_offset::ConsumerOffset)>,
+    I: IntoIterator<Item = (ConsumerGroupId, consumer_offset::ConsumerOffset)>,
 {
     fn from(iter: I) -> Self {
         Self(papaya::HashMap::from_iter(iter))
@@ -87,7 +88,7 @@ where
 }
 
 impl std::ops::Deref for ConsumerGroupOffsets {
-    type Target = papaya::HashMap<usize, consumer_offset::ConsumerOffset>;
+    type Target = papaya::HashMap<ConsumerGroupId, 
consumer_offset::ConsumerOffset>;
 
     fn deref(&self) -> &Self::Target {
         &self.0
diff --git a/core/server/src/streaming/partitions/storage.rs 
b/core/server/src/streaming/partitions/storage.rs
index 9857fa860..9899bb9aa 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -19,6 +19,7 @@ use super::COMPONENT;
 use crate::{
     configs::system::SystemConfig, io::fs_utils::remove_dir_all,
     streaming::partitions::consumer_offset::ConsumerOffset,
+    streaming::polling_consumer::ConsumerGroupId,
 };
 use compio::{
     fs::{self, OpenOptions, create_dir_all},
@@ -159,10 +160,7 @@ pub async fn persist_offset(path: &str, offset: u64) -> 
Result<(), IggyError> {
     Ok(())
 }
 
-pub fn load_consumer_offsets(
-    path: &str,
-    kind: ConsumerKind,
-) -> Result<Vec<ConsumerOffset>, IggyError> {
+pub fn load_consumer_offsets(path: &str) -> Result<Vec<ConsumerOffset>, 
IggyError> {
     trace!("Loading consumer offsets from path: {path}...");
     let dir_entries = std::fs::read_dir(path);
     if dir_entries.is_err() {
@@ -183,11 +181,9 @@ pub fn load_consumer_offsets(
         }
 
         let name = dir_entry.file_name().into_string().unwrap();
-        let consumer_id = name.parse::<u32>();
-        if consumer_id.is_err() {
-            error!("Invalid consumer ID file with name: '{}'.", name);
-            continue;
-        }
+        let consumer_id = name.parse::<u32>().unwrap_or_else(|_| {
+            panic!("Invalid consumer ID file with name: '{}'.", name);
+        });
 
         let path = dir_entry.path();
         let path = path.to_str();
@@ -197,7 +193,6 @@ pub fn load_consumer_offsets(
         }
 
         let path = path.unwrap().to_string();
-        let consumer_id = consumer_id.unwrap();
         let file = std::fs::File::open(&path)
             .with_error(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to open offset 
file, path: {path}")
@@ -214,7 +209,7 @@ pub fn load_consumer_offsets(
         let offset = AtomicU64::new(u64::from_le_bytes(offset));
 
         consumer_offsets.push(ConsumerOffset {
-            kind,
+            kind: ConsumerKind::Consumer,
             consumer_id,
             offset,
             path,
@@ -224,3 +219,74 @@ pub fn load_consumer_offsets(
     consumer_offsets.sort_by(|a, b| a.consumer_id.cmp(&b.consumer_id));
     Ok(consumer_offsets)
 }
+
+pub fn load_consumer_group_offsets(
+    path: &str,
+) -> Result<Vec<(ConsumerGroupId, ConsumerOffset)>, IggyError> {
+    trace!("Loading consumer group offsets from path: {path}...");
+    let dir_entries = std::fs::read_dir(path);
+    if dir_entries.is_err() {
+        return Err(IggyError::CannotReadConsumerOffsets(path.to_owned()));
+    }
+
+    let mut consumer_group_offsets = Vec::new();
+    let dir_entries = dir_entries.unwrap();
+    for dir_entry in dir_entries {
+        let dir_entry = dir_entry.unwrap();
+        let metadata = dir_entry.metadata();
+        if metadata.is_err() {
+            break;
+        }
+
+        if metadata.unwrap().is_dir() {
+            continue;
+        }
+
+        let name = dir_entry.file_name().into_string().unwrap();
+
+        let consumer_group_id = name.parse::<u32>().unwrap_or_else(|_| {
+            panic!(
+                "Invalid consumer group ID in consumer group file with name: 
'{}'.",
+                name
+            );
+        });
+        let consumer_group_id = ConsumerGroupId(consumer_group_id as usize);
+
+        let path = dir_entry.path();
+        let path = path.to_str();
+        if path.is_none() {
+            error!(
+                "Invalid consumer group offset path for file with name: '{}'.",
+                name
+            );
+            continue;
+        }
+
+        let path = path.unwrap().to_string();
+        let file = std::fs::File::open(&path)
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to open offset 
file, path: {path}")
+            })
+            .map_err(|_| IggyError::CannotReadFile)?;
+        let mut cursor = std::io::Cursor::new(file);
+        let mut offset = [0; 8];
+        cursor
+            .get_mut().read_exact(&mut offset)
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to read 
consumer group offset from file, path: {path}")
+            })
+            .map_err(|_| IggyError::CannotReadFile)?;
+        let offset = AtomicU64::new(u64::from_le_bytes(offset));
+
+        let consumer_offset = ConsumerOffset {
+            kind: ConsumerKind::ConsumerGroup,
+            consumer_id: consumer_group_id.0 as u32,
+            offset,
+            path,
+        };
+
+        consumer_group_offsets.push((consumer_group_id, consumer_offset));
+    }
+
+    Ok(consumer_group_offsets)
+}
diff --git a/core/server/src/streaming/polling_consumer.rs 
b/core/server/src/streaming/polling_consumer.rs
index 2f75f379e..dcc5f0f4a 100644
--- a/core/server/src/streaming/polling_consumer.rs
+++ b/core/server/src/streaming/polling_consumer.rs
@@ -20,10 +20,28 @@ use crate::streaming::utils::hash;
 use iggy_common::{IdKind, Identifier};
 use std::fmt::{Display, Formatter};
 
+#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
+pub struct ConsumerGroupId(pub usize);
+
+impl Display for ConsumerGroupId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.0)
+    }
+}
+
+#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
+pub struct MemberId(pub usize);
+
+impl Display for MemberId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.0)
+    }
+}
+
 #[derive(Debug, PartialEq, Copy, Clone)]
 pub enum PollingConsumer {
-    Consumer(usize, usize),      // Consumer ID + Partition ID
-    ConsumerGroup(usize, usize), // Consumer Group ID + Member ID
+    Consumer(usize, usize),                   // Consumer ID + Partition ID
+    ConsumerGroup(ConsumerGroupId, MemberId), // Consumer Group ID + Member ID
 }
 
 impl PollingConsumer {
@@ -32,7 +50,7 @@ impl PollingConsumer {
     }
 
     pub fn consumer_group(consumer_group_id: usize, member_id: usize) -> Self {
-        PollingConsumer::ConsumerGroup(consumer_group_id, member_id)
+        PollingConsumer::ConsumerGroup(ConsumerGroupId(consumer_group_id), 
MemberId(member_id))
     }
 
     pub fn resolve_consumer_id(identifier: &Identifier) -> usize {
@@ -103,8 +121,8 @@ mod tests {
 
         match polling_consumer {
             PollingConsumer::ConsumerGroup(consumer_group_id, member_id) => {
-                assert_eq!(consumer_group_id, group_id);
-                assert_eq!(member_id, client_id);
+                assert_eq!(consumer_group_id, ConsumerGroupId(group_id));
+                assert_eq!(member_id, MemberId(client_id));
             }
             _ => panic!("Expected ConsumerGroup"),
         }
diff --git a/core/server/src/streaming/streams/helpers.rs 
b/core/server/src/streaming/streams/helpers.rs
index ad146bb63..1ffad1227 100644
--- a/core/server/src/streaming/streams/helpers.rs
+++ b/core/server/src/streaming/streams/helpers.rs
@@ -80,8 +80,8 @@ pub fn store_consumer_offset(
     }
 }
 
-pub fn store_consumer_group_member_offset(
-    member_id: usize,
+pub fn store_consumer_group_offset(
+    consumer_group_id: crate::streaming::polling_consumer::ConsumerGroupId,
     topic_id: &Identifier,
     partition_id: usize,
     offset: u64,
@@ -93,8 +93,8 @@ pub fn store_consumer_group_member_offset(
             let topic_id = root.id();
             root.partitions().with_components_by_id(
                 partition_id,
-                partitions::helpers::store_consumer_group_member_offset(
-                    member_id,
+                partitions::helpers::store_consumer_group_offset(
+                    consumer_group_id,
                     stream_id,
                     topic_id,
                     partition_id,
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 08eff1198..40b9f7c8e 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -27,7 +27,7 @@ repository = "https://github.com/apache/iggy";
 
 [dependencies]
 bytes = "1.10.1"
-iggy = { path = "../../core/sdk", version = "0.8.0-edge.3" }
+iggy = { path = "../../core/sdk", version = "0.8.0-edge.4" }
 pyo3 = "0.26.0"
 pyo3-async-runtimes = { version = "0.26.0", features = [
     "attributes",


Reply via email to