This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 66a9c7c4c fix(server): report correct current_offset after segment 
deletion (#2798)
66a9c7c4c is described below

commit 66a9c7c4c4b38b6378ec6e45c1b79e1d3d84ed7f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 23 13:47:11 2026 +0100

    fix(server): report correct current_offset after segment deletion (#2798)
---
 .../server/scenarios/purge_delete_scenario.rs      | 40 ++++++++++++++++++++++
 core/server/src/shard/system/consumer_offsets.rs   |  5 +--
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/core/integration/tests/server/scenarios/purge_delete_scenario.rs 
b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
index ac3af17fe..2e971be40 100644
--- a/core/integration/tests/server/scenarios/purge_delete_scenario.rs
+++ b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
@@ -130,6 +130,26 @@ pub async fn run(harness: &mut TestHarness, 
restart_server: bool) {
         "Messages 7..25 survive"
     );
 
+    // After deleting segment 0 (7 messages removed): current_offset must still
+    // reflect the true partition max (24), not messages_count - 1 (17).
+    {
+        let max_offset = (TOTAL_MESSAGES - 1) as u64;
+        let offset_info = client
+            .get_consumer_offset(&consumer, &stream_ident, &topic_ident, 
Some(PARTITION_ID))
+            .await
+            .unwrap()
+            .expect("consumer offset must exist after segment deletion");
+        assert_eq!(offset_info.stored_offset, stored_offset);
+        assert_eq!(
+            offset_info.current_offset,
+            max_offset,
+            "current_offset must be {max_offset} (true partition max), \
+             got {} (messages_count - 1 = {})",
+            offset_info.current_offset,
+            TOTAL_MESSAGES as u64 - MSGS_PER_SEALED_SEGMENT - 1,
+        );
+    }
+
     // --- Barrier prevents deletion ---
     client
         .delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1)
@@ -174,6 +194,26 @@ pub async fn run(harness: &mut TestHarness, 
restart_server: bool) {
         "Messages 14..25 survive"
     );
 
+    // After deleting segments 0 and 1 (14 messages removed): current_offset
+    // must still be 24, not messages_count - 1 (10).
+    {
+        let max_offset = (TOTAL_MESSAGES - 1) as u64;
+        let offset_info = client
+            .get_consumer_offset(&consumer, &stream_ident, &topic_ident, 
Some(PARTITION_ID))
+            .await
+            .unwrap()
+            .expect("consumer offset must exist after second deletion");
+        assert_eq!(offset_info.stored_offset, seg1_end_offset);
+        assert_eq!(
+            offset_info.current_offset,
+            max_offset,
+            "current_offset must remain {max_offset} after deleting two 
segments, \
+             got {} (messages_count - 1 = {})",
+            offset_info.current_offset,
+            TOTAL_MESSAGES as u64 - 2 * MSGS_PER_SEALED_SEGMENT - 1,
+        );
+    }
+
     // --- Consumer not stuck ---
     let polled_next = client
         .poll_messages(
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index d03d7971b..c2572231b 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -143,10 +143,7 @@ impl IggyShard {
         let partition_current_offset = self
             .metadata
             .get_partition_stats(&ns)
-            .map(|s| {
-                let count = s.messages_count_inconsistent();
-                if count > 0 { count - 1 } else { 0 }
-            })
+            .map(|s| s.current_offset())
             .unwrap_or(0);
 
         let offset = match polling_consumer {

Reply via email to