This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch integration_tests
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 4b8dbbbfec49fb5cc9f2d35f0b744c8847b6c550
Author: numinex <[email protected]>
AuthorDate: Wed May 6 10:38:35 2026 +0200

    address code review nits
---
 core/server-ng/src/bootstrap.rs    | 39 ++++++++++++++++++++++++++++++++++++++
 core/server-ng/src/server_error.rs | 12 ++++++++++++
 2 files changed, 51 insertions(+)

diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index 816a464f5..2c63fa753 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -511,6 +511,17 @@ async fn hydrate_partition_log(
         .zip(loaded_log.storages().iter().cloned())
         .enumerate()
     {
+        validate_recovered_segment(
+            stream_id,
+            topic_id,
+            partition_id,
+            segment,
+            &storage,
+            loaded_log
+                .indexes()
+                .get(segment_index)
+                .and_then(|indexes| indexes.as_ref()),
+        )?;
         let max_timestamp = match loaded_log
             .indexes()
             .get(segment_index)
@@ -581,6 +592,34 @@ async fn hydrate_partition_log(
     Ok(())
 }
 
+fn validate_recovered_segment(
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    segment: &iggy_common::Segment,
+    storage: &iggy_common::SegmentStorage,
+    indexes: Option<&server::streaming::segments::IggyIndexesMut>,
+) -> Result<(), ServerNgError> {
+    let messages_size_bytes = storage
+        .messages_reader
+        .as_ref()
+        .map_or(0, |reader| u64::from(reader.file_size()));
+    let indexed_size_bytes = indexes.map_or(0, |indexes| 
u64::from(indexes.messages_size()));
+    if messages_size_bytes == indexed_size_bytes {
+        return Ok(());
+    }
+
+    Err(ServerNgError::RecoveredSegmentSizeDivergence {
+        stream_id,
+        topic_id,
+        partition_id,
+        start_offset: segment.start_offset,
+        end_offset: segment.end_offset,
+        messages_size_bytes,
+        indexed_size_bytes,
+    })
+}
+
 fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> 
Segment {
     Segment {
         sealed: segment.sealed,
diff --git a/core/server-ng/src/server_error.rs 
b/core/server-ng/src/server_error.rs
index 77e17ee1c..664036bf2 100644
--- a/core/server-ng/src/server_error.rs
+++ b/core/server-ng/src/server_error.rs
@@ -55,6 +55,18 @@ pub enum ServerNgError {
     MissingReplicaId,
     #[error("cluster node for replica {replica_id} is missing tcp_replica 
port")]
     ClusterReplicaPortMissing { replica_id: u8 },
+    #[error(
+        "recovered segment for stream {stream_id}, topic {topic_id}, partition 
{partition_id} at start_offset {start_offset} has message/index divergence 
(messages_size={messages_size_bytes}, indexed_size={indexed_size_bytes}, 
end_offset={end_offset})"
+    )]
+    RecoveredSegmentSizeDivergence {
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        start_offset: u64,
+        end_offset: u64,
+        messages_size_bytes: u64,
+        indexed_size_bytes: u64,
+    },
     #[error(
         "failed to load persisted {consumer_kind} offsets for stream 
{stream_id}, topic {topic_id}, partition {partition_id} from {path}"
     )]

Reply via email to