atharvalade commented on code in PR #2916:
URL: https://github.com/apache/iggy/pull/2916#discussion_r2919908678


##########
core/journal/src/metadata_journal.rs:
##########
@@ -0,0 +1,686 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file_storage::FileStorage;
+use crate::{Journal, JournalHandle};
+use bytes::Bytes;
+use iggy_common::header::{Command2, PrepareHeader};
+use iggy_common::message::Message;
+use std::cell::{Cell, Ref, RefCell};
+use std::fmt;
+use std::fs;
+use std::io;
+use std::io::Write;
+use std::path::Path;
+
+const HEADER_SIZE: usize = size_of::<PrepareHeader>();
+
+/// Maximum allowed size for a single WAL entry (64 MiB).
+///
+/// A header with `size` exceeding this limit is treated as corrupt. This
+/// prevents a bit-flipped size field (e.g. `0xFFFF_FFFF`) from causing a
+/// multi-GiB allocation during the WAL scan.
+const MAX_ENTRY_SIZE: u64 = 64 * 1024 * 1024;
+
+/// Number of slots in the journal ring buffer.
+///
+/// Must be larger than the maximum number of entries between consecutive
+/// snapshots. If the journal wraps past this window, older un-snapshotted
+/// entries are silently evicted from the in-memory index (the WAL file
+/// still contains them, but they become unreachable for recovery).
+///
+/// **NOTE:** This number needs to be chosen in balance between number of
+/// entries in [`core::consensus::pipeline_prepare_queue_max`]. Because this 
number controls
+/// how many committed but not yet snapshotted entries that the buffer can
+/// hold. This may need to be tuned properly.
+pub const SLOT_COUNT: usize = 1024;
+
+/// Error type for journal operations.
+#[derive(Debug)]
+#[allow(clippy::module_name_repetitions)]
+pub enum JournalError {
+    Io(io::Error),
+}
+
+impl fmt::Display for JournalError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::Io(e) => write!(f, "journal I/O error: {e}"),
+        }
+    }
+}
+
+impl std::error::Error for JournalError {
+    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+        match self {
+            Self::Io(e) => Some(e),
+        }
+    }
+}
+
+impl From<io::Error> for JournalError {
+    fn from(e: io::Error) -> Self {
+        Self::Io(e)
+    }
+}
+
+/// Persistent metadata journal backed by an append-only WAL file.
+///
+/// Each WAL entry is a raw `Message<PrepareHeader>`:
+/// `[PrepareHeader: 256 bytes][body: header.size - 256 bytes]`
+///
+/// The in-memory index is a fixed-size slot array indexed by
+/// `op % SLOT_COUNT`.
+pub struct MetadataJournal {
+    storage: FileStorage,
+    headers: RefCell<Vec<Option<PrepareHeader>>>,
+    offsets: RefCell<Vec<Option<u64>>>,
+    last_op: Cell<Option<u64>>,
+    /// Highest op that has been durably snapshotted. Entries with `op <= 
snapshot_op`
+    /// are safe to evict from the slot array. Appending an entry that would 
evict
+    /// an un-snapshotted entry (op > `snapshot_op`) panics and the upper 
layer must
+    /// take a snapshot before the journal wraps.
+    snapshot_op: Cell<u64>,
+}
+
+impl fmt::Debug for MetadataJournal {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("MetadataJournal")
+            .field("write_offset", &self.storage.file_len())
+            .field("last_op", &self.last_op.get())
+            .finish_non_exhaustive()
+    }
+}
+
+#[allow(clippy::cast_possible_truncation)]
+const fn slot_for_op(op: u64) -> usize {
+    op as usize % SLOT_COUNT
+}
+
+#[allow(clippy::cast_possible_truncation)]
+impl MetadataJournal {
+    /// Open the WAL file, scanning forward to rebuild the in-memory index.
+    ///
+    /// `snapshot_op` is the highest op that has been durably snapshotted.
+    /// It must be provided so that `append()` can detect slot collisions
+    /// that would evict un-snapshotted entries.
+    ///
+    /// If a truncated entry is found at the tail (crash during write),
+    /// the file is truncated to the last complete entry.
+    ///
+    /// # Errors
+    /// Returns `JournalError::Io` if the WAL file cannot be opened or read.
+    pub fn open(path: &Path, snapshot_op: u64) -> Result<Self, JournalError> {
+        let storage = FileStorage::open(path)?;
+        let file_len = storage.file_len();
+        let mut headers: Vec<Option<PrepareHeader>> = vec![None; SLOT_COUNT];
+        let mut offsets: Vec<Option<u64>> = vec![None; SLOT_COUNT];
+        let mut last_op: Option<u64> = None;
+        let mut pos: u64 = 0;
+        let mut header_buf = vec![0u8; HEADER_SIZE];
+
+        while pos + HEADER_SIZE as u64 <= file_len {
+            // Read the 256-byte header
+            storage.read_sync(pos, &mut header_buf)?;
+            let header: PrepareHeader = 
*bytemuck::checked::from_bytes(&header_buf);
+
+            // Validate: must be a Prepare command with sane size
+            if header.command != Command2::Prepare
+                || (header.size as usize) < HEADER_SIZE
+                || u64::from(header.size) > MAX_ENTRY_SIZE
+            {
+                // Corrupt or non-prepare entry, truncate here
+                storage.truncate(pos)?;
+                break;
+            }
+
+            let entry_size = u64::from(header.size);
+
+            // Check if the full entry fits
+            if pos + entry_size > file_len {
+                // Truncated entry at tail
+                // This handles the case where crash happened during write and
+                // only header was written and body was not. so we truncate 
the file to the start of the entry.
+                storage.truncate(pos)?;
+                break;
+            }
+
+            let slot = slot_for_op(header.op);
+
+            // Note: Regarding duplicate op in WAL. We rewrite it with 
whichever
+            // is the latest entry.
+            headers[slot] = Some(header);
+            offsets[slot] = Some(pos);
+
+            match last_op {
+                Some(current) if header.op > current => last_op = 
Some(header.op),
+                None => last_op = Some(header.op),
+                _ => {}
+            }
+
+            pos += entry_size;
+        }
+
+        // If there are leftover bytes less than a header, truncate them
+        if pos < storage.file_len() {
+            storage.truncate(pos)?;
+        }
+
+        Ok(Self {
+            storage,
+            headers: RefCell::new(headers),
+            offsets: RefCell::new(offsets),
+            last_op: Cell::new(last_op),
+            snapshot_op: Cell::new(snapshot_op),
+        })
+    }
+
+    /// Return headers with `op >= from_op`, sorted by op.
+    pub fn iter_headers_from(&self, from_op: u64) -> Vec<PrepareHeader> {
+        let headers = self.headers.borrow();
+        let mut result: Vec<PrepareHeader> = headers
+            .iter()
+            .filter_map(|slot| slot.filter(|h| h.op >= from_op))
+            .collect();
+        result.sort_unstable_by_key(|h| h.op);
+        result
+    }
+
+    /// Highest op number in the index, or `None` if empty.
+    pub const fn last_op(&self) -> Option<u64> {
+        self.last_op.get()
+    }
+
+    /// Advance the snapshot watermark. The caller must ensure `op` is
+    /// monotonically increasing and corresponds to a durable snapshot.
+    ///
+    /// # Panics
+    /// Panics if `op` is less than the current snapshot watermark.
+    pub fn set_snapshot_op(&self, op: u64) {
+        assert!(
+            op >= self.snapshot_op.get(),
+            "snapshot_op must be monotonically increasing: {} -> {}",
+            self.snapshot_op.get(),
+            op
+        );
+        self.snapshot_op.set(op);
+    }
+
+    /// Access the underlying storage (for fsync in tests, etc.).
+    pub const fn storage_ref(&self) -> &FileStorage {
+        &self.storage
+    }
+
+    /// Synchronous entry read for recovery.
+    ///
+    /// Returns `Ok(None)` if the op is not in the index.
+    ///
+    /// # Errors
+    /// Returns an I/O error if the read fails or the entry is malformed.
+    pub fn entry_sync(&self, header: &PrepareHeader) -> 
io::Result<Option<Message<PrepareHeader>>> {
+        let offsets = self.offsets.borrow();
+        let slot = slot_for_op(header.op);
+        let Some(offset) = offsets[slot] else {
+            return Ok(None);
+        };
+        let size = header.size as usize;
+        drop(offsets);
+        let mut buf = vec![0u8; size];
+        self.storage.read_sync(offset, &mut buf)?;
+        let msg = Message::from_bytes(Bytes::from(buf))
+            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, 
e.to_string()))?;
+        Ok(Some(msg))
+    }
+}
+
+#[allow(
+    clippy::cast_possible_truncation,
+    clippy::cast_sign_loss,
+    clippy::future_not_send
+)]
+impl Journal<FileStorage> for MetadataJournal {
+    type Header = PrepareHeader;
+    type Entry = Message<PrepareHeader>;
+    type HeaderRef<'a> = Ref<'a, PrepareHeader>;
+
+    fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
+        let headers = self.headers.borrow();
+        Ref::filter_map(headers, |h| {
+            let slot = slot_for_op(idx as u64);
+            let header = h[slot].as_ref()?;
+            if header.op == idx as u64 {
+                Some(header)
+            } else {
+                None
+            }
+        })
+        .ok()
+    }
+
+    fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>> {
+        if header.op == 0 {
+            return None;
+        }
+        self.header((header.op - 1) as usize)
+    }
+
+    fn set_snapshot_op(&self, op: u64) {
+        Self::set_snapshot_op(self, op);
+    }
+
+    fn remaining_capacity(&self) -> Option<usize> {
+        let Some(last) = self.last_op.get() else {
+            return Some(SLOT_COUNT);
+        };
+        let snapshot = self.snapshot_op.get();
+        let used = (last - snapshot) as usize;
+        Some(SLOT_COUNT.saturating_sub(used))
+    }

Review Comment:
   `let used = (last - snapshot) as usize` will underflow if `last_op < 
snapshot_op`, which is possible after a partial WAL recovery where the snapshot 
is ahead of the truncated WAL (e.g. snapshot at op 100 but WAL only has entries 
up to op 50).
   
   In debug builds this panics. In release builds it wraps to a huge value, 
`saturating_sub` returns 0, and an unnecessary forced checkpoint is triggered. 
I believe using `last.saturating_sub(snapshot)` or adding a guard that handles 
this case will help.



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -325,3 +404,31 @@ where
         send_prepare_ok_common(consensus, header, Some(persisted)).await;
     }
 }
+
+#[allow(unused)]
+impl<C, J, S, M> IggyMetadata<C, J, S, M> {
+    /// Create a snapshot from the current state machine and persist it to 
disk.
+    ///
+    /// After the snapshot is durably persisted, advances the journal's
+    /// snapshot watermark so that entries at or below `last_op` may be
+    /// evicted from the ring buffer on future appends.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if snapshotting, persistence, or compaction 
fails.
+    pub fn checkpoint(&self, data_dir: &Path, last_op: u64) -> Result<(), 
SnapshotError>
+    where
+        M: FillSnapshot<MetadataSnapshot>,
+        J: JournalHandle,
+    {
+        let snapshot = IggySnapshot::create(&self.mux_stm, last_op)?;
+        let path = data_dir.join(super::METADATA_DIR).join("snapshot.bin");
+        snapshot.persist(&path)?;
+
+        if let Some(journal) = &self.journal {
+            journal.handle().set_snapshot_op(last_op);
+            journal.handle().compact().map_err(SnapshotError::Io)?;

Review Comment:
   
   `IggySnapshot::create` labels the snapshot with `sequence_number = last_op` 
(which is `current_op` from the caller), but the state machine has only applied 
entries up to the commit number, not up to the latest prepare.
   
   On recovery, `replay_from` is computed as `snapshot.sequence_number() + 1`, 
so entries between the real applied op and `current_op` get skipped and are 
never replayed into the state machine. Same root cause as issue 1: `last_op` 
should be the commit number.
   



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -151,10 +198,36 @@ where
 
         // TODO add assertions for valid state here.
 
-        // TODO verify that the current prepare fits in the WAL.
-
         // TODO handle gap in ops.
 
+        // Force a checkpoint if the journal is running low on capacity.
+        if journal
+            .handle()
+            .remaining_capacity()
+            .is_some_and(|c| c <= CHECKPOINT_MARGIN)
+        {
+            if let Some(data_dir) = &self.data_dir {
+                let snap_op = current_op;
+                if let Err(e) = self.checkpoint(data_dir, snap_op) {
+                    error!(

Review Comment:
   `snap_op` is set to `current_op`, which is the last prepared op, not the 
last committed op. When `checkpoint()` runs with this value it calls 
`set_snapshot_op` and `compact()`, which removes all WAL entries at or below 
that op.
   
   But entries between `commit_number + 1` and `current_op` are still in the 
pipeline awaiting quorum. When `on_ack` later tries to look them up via 
`journal.handle().entry()` at line 300, it hits the `unwrap_or_else(|| 
panic!(...))` because the entries were already compacted away.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to