This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 66a9c7c4c fix(server): report correct current_offset after segment
deletion (#2798)
66a9c7c4c is described below
commit 66a9c7c4c4b38b6378ec6e45c1b79e1d3d84ed7f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 23 13:47:11 2026 +0100
fix(server): report correct current_offset after segment deletion (#2798)
---
.../server/scenarios/purge_delete_scenario.rs | 40 ++++++++++++++++++++++
core/server/src/shard/system/consumer_offsets.rs | 5 +--
2 files changed, 41 insertions(+), 4 deletions(-)
diff --git a/core/integration/tests/server/scenarios/purge_delete_scenario.rs
b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
index ac3af17fe..2e971be40 100644
--- a/core/integration/tests/server/scenarios/purge_delete_scenario.rs
+++ b/core/integration/tests/server/scenarios/purge_delete_scenario.rs
@@ -130,6 +130,26 @@ pub async fn run(harness: &mut TestHarness,
restart_server: bool) {
"Messages 7..25 survive"
);
+ // After deleting segment 0 (7 messages removed): current_offset must still
+ // reflect the true partition max (24), not messages_count - 1 (17).
+ {
+ let max_offset = (TOTAL_MESSAGES - 1) as u64;
+ let offset_info = client
+ .get_consumer_offset(&consumer, &stream_ident, &topic_ident,
Some(PARTITION_ID))
+ .await
+ .unwrap()
+ .expect("consumer offset must exist after segment deletion");
+ assert_eq!(offset_info.stored_offset, stored_offset);
+ assert_eq!(
+ offset_info.current_offset,
+ max_offset,
+ "current_offset must be {max_offset} (true partition max), \
+ got {} (messages_count - 1 = {})",
+ offset_info.current_offset,
+ TOTAL_MESSAGES as u64 - MSGS_PER_SEALED_SEGMENT - 1,
+ );
+ }
+
// --- Barrier prevents deletion ---
client
.delete_segments(&stream_ident, &topic_ident, PARTITION_ID, 1)
@@ -174,6 +194,26 @@ pub async fn run(harness: &mut TestHarness,
restart_server: bool) {
"Messages 14..25 survive"
);
+ // After deleting segments 0 and 1 (14 messages removed): current_offset
+ // must still be 24, not messages_count - 1 (10).
+ {
+ let max_offset = (TOTAL_MESSAGES - 1) as u64;
+ let offset_info = client
+ .get_consumer_offset(&consumer, &stream_ident, &topic_ident,
Some(PARTITION_ID))
+ .await
+ .unwrap()
+ .expect("consumer offset must exist after second deletion");
+ assert_eq!(offset_info.stored_offset, seg1_end_offset);
+ assert_eq!(
+ offset_info.current_offset,
+ max_offset,
+ "current_offset must remain {max_offset} after deleting two
segments, \
+ got {} (messages_count - 1 = {})",
+ offset_info.current_offset,
+ TOTAL_MESSAGES as u64 - 2 * MSGS_PER_SEALED_SEGMENT - 1,
+ );
+ }
+
// --- Consumer not stuck ---
let polled_next = client
.poll_messages(
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index d03d7971b..c2572231b 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -143,10 +143,7 @@ impl IggyShard {
let partition_current_offset = self
.metadata
.get_partition_stats(&ns)
- .map(|s| {
- let count = s.messages_count_inconsistent();
- if count > 0 { count - 1 } else { 0 }
- })
+ .map(|s| s.current_offset())
.unwrap_or(0);
let offset = match polling_consumer {