This is an automated email from the ASF dual-hosted git repository. numinnex pushed a commit to branch integration_tests in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 4b8dbbbfec49fb5cc9f2d35f0b744c8847b6c550 Author: numinex <[email protected]> AuthorDate: Wed May 6 10:38:35 2026 +0200 address code review nits --- core/server-ng/src/bootstrap.rs | 39 ++++++++++++++++++++++++++++++++++++++ core/server-ng/src/server_error.rs | 12 ++++++++++++ 2 files changed, 51 insertions(+) diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 816a464f5..2c63fa753 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -511,6 +511,17 @@ async fn hydrate_partition_log( .zip(loaded_log.storages().iter().cloned()) .enumerate() { + validate_recovered_segment( + stream_id, + topic_id, + partition_id, + segment, + &storage, + loaded_log + .indexes() + .get(segment_index) + .and_then(|indexes| indexes.as_ref()), + )?; let max_timestamp = match loaded_log .indexes() .get(segment_index) @@ -581,6 +592,34 @@ async fn hydrate_partition_log( Ok(()) } +fn validate_recovered_segment( + stream_id: usize, + topic_id: usize, + partition_id: usize, + segment: &iggy_common::Segment, + storage: &iggy_common::SegmentStorage, + indexes: Option<&server::streaming::segments::IggyIndexesMut>, +) -> Result<(), ServerNgError> { + let messages_size_bytes = storage + .messages_reader + .as_ref() + .map_or(0, |reader| u64::from(reader.file_size())); + let indexed_size_bytes = indexes.map_or(0, |indexes| u64::from(indexes.messages_size())); + if messages_size_bytes == indexed_size_bytes { + return Ok(()); + } + + Err(ServerNgError::RecoveredSegmentSizeDivergence { + stream_id, + topic_id, + partition_id, + start_offset: segment.start_offset, + end_offset: segment.end_offset, + messages_size_bytes, + indexed_size_bytes, + }) +} + fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> Segment { Segment { sealed: segment.sealed, diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs index 77e17ee1c..664036bf2 100644 --- a/core/server-ng/src/server_error.rs +++ b/core/server-ng/src/server_error.rs @@ -55,6 +55,18 @@ pub enum ServerNgError { MissingReplicaId, #[error("cluster node for replica {replica_id} is missing tcp_replica port")] ClusterReplicaPortMissing { replica_id: u8 }, + #[error( + "recovered segment for stream {stream_id}, topic {topic_id}, partition {partition_id} at start_offset {start_offset} has message/index divergence (messages_size={messages_size_bytes}, indexed_size={indexed_size_bytes}, end_offset={end_offset})" + )] + RecoveredSegmentSizeDivergence { + stream_id: usize, + topic_id: usize, + partition_id: usize, + start_offset: u64, + end_offset: u64, + messages_size_bytes: u64, + indexed_size_bytes: u64, + }, #[error( "failed to load persisted {consumer_kind} offsets for stream {stream_id}, topic {topic_id}, partition {partition_id} from {path}" )]
