dracoooooo commented on code in PR #1552: URL: https://github.com/apache/horaedb/pull/1552#discussion_r1715338344
########## src/wal/src/local_storage_impl/segment.rs: ########## @@ -0,0 +1,741 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + collections::{HashMap, VecDeque}, + fmt::Debug, + fs, + fs::{File, OpenOptions}, + io, + io::Write, + path::Path, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, + }, +}; + +use bytes_ext::{BufMut, BytesMut}; +use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER}; +use crc32fast::Hasher; +use generic_error::{BoxError, GenericError}; +use macros::define_result; +use memmap2::{MmapMut, MmapOptions}; +use runtime::Runtime; +use snafu::{ensure, Backtrace, ResultExt, Snafu}; + +use crate::{ + kv_encoder::{CommonLogEncoding, CommonLogKey}, + log_batch::{LogEntry, LogWriteBatch}, + manager::{ + BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, ScanContext, + ScanRequest, SyncLogIterator, WalLocation, WriteContext, + }, +}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to open or create file: {}", source))] + FileOpen { source: io::Error }, + + #[snafu(display("Failed to map file to memory: {}", source))] + Mmap { source: io::Error }, + + #[snafu(display("Segment full"))] + SegmentFull, + + #[snafu(display("Failed to append data to segment file: {}", source))] + SegmentAppend { source: io::Error }, + + #[snafu(display("Failed to flush mmap: {}", source))] + Flush { source: io::Error }, + + #[snafu(display( + "Attempted to read beyond segment size. Offset: {}, Size: {}, FileSize: {}", + offset, + size, + file_size + ))] + ReadOutOfBounds { + offset: u64, + size: u64, + file_size: u64, + }, + + #[snafu(display("Invalid segment header"))] + InvalidHeader, + + #[snafu(display("Segment not open, id:{}", id))] + SegmentNotOpen { id: u64 }, + + #[snafu(display("Segment not found, id:{}", id))] + SegmentNotFound { id: u64 }, + + #[snafu(display("Unable to convert slice: {}", source))] + Conversion { + source: std::array::TryFromSliceError, + }, + + #[snafu(display("{}", source))] + Encoding { source: GenericError }, + + #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))] + InvalidRecord { + source: GenericError, + backtrace: Backtrace, + }, + + #[snafu(display("Length mismatch: expected {} but found {}", expected, actual))] + LengthMismatch { expected: usize, actual: usize }, + + #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, actual))] + ChecksumMismatch { expected: u32, actual: u32 }, +} + +define_result!(Error); + +const HEADER: &str = "HoraeDB WAL"; +const CRC_SIZE: usize = 4; +const RECORD_LENGTH_SIZE: usize = 4; +const KEY_LENGTH_SIZE: usize = 2; +const VALUE_LENGTH_SIZE: usize = 4; +// todo: make MAX_FILE_SIZE configurable +const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024; + +#[derive(Debug)] +pub struct Segment { + path: String, + id: u64, + size: u64, + min_seq: SequenceNumber, + max_seq: SequenceNumber, + is_open: bool, + file: Option<File>, + mmap: Option<MmapMut>, + record_position: Option<Vec<Position>>, +} + +#[derive(Debug, Clone)] +pub struct Position { + start: u64, + end: u64, +} + +impl Segment { + pub fn new(path: String, segment_id: u64) -> Result<Segment> { + if !Path::new(&path).exists() { + let mut file = File::create(&path).context(FileOpen)?; + file.write_all(HEADER.as_bytes()).context(FileOpen)?; + } + Ok(Segment { + path, + id: segment_id, + size: HEADER.len() as u64, + is_open: false, + min_seq: MAX_SEQUENCE_NUMBER, + max_seq: MIN_SEQUENCE_NUMBER, + file: None, + mmap: None, + record_position: None, + }) + } + + pub fn open(&mut self) -> Result<()> { + let file = OpenOptions::new() + .read(true) + .append(true) + .open(&self.path) + .context(FileOpen)?; + + let metadata = file.metadata().context(FileOpen)?; + let size = metadata.len(); + + let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? }; + + // Validate segment header + let header_len = HEADER.len(); + ensure!(size >= header_len as u64, InvalidHeader); + + let header_bytes = &mmap[0..header_len]; + let header_str = std::str::from_utf8(header_bytes).map_err(|_| Error::InvalidHeader)?; + + ensure!(header_str == HEADER, InvalidHeader); + + // Read and validate all records + let mut pos = header_len; + let mut record_position = Vec::new(); + + while pos < size as usize { + ensure!( + pos + CRC_SIZE + RECORD_LENGTH_SIZE <= size as usize, + LengthMismatch { + expected: pos + CRC_SIZE + RECORD_LENGTH_SIZE, + actual: size as usize + } + ); + + // Read the CRC + let crc = u32::from_le_bytes(mmap[pos..pos + CRC_SIZE].try_into().context(Conversion)?); + pos += CRC_SIZE; + + // Read the length + let length = u32::from_le_bytes( + mmap[pos..pos + RECORD_LENGTH_SIZE] + .try_into() + .context(Conversion)?, + ); + pos += RECORD_LENGTH_SIZE; + + // Ensure the entire record is within the bounds of the mmap + ensure!( + pos + length as usize <= size as usize, + LengthMismatch { + expected: pos + length as usize, + actual: size as usize + } + ); + + // Verify the checksum (CRC32 of the data) + let data = &mmap[pos..pos + length as usize]; + let computed_crc = crc32fast::hash(data); + ensure!( + computed_crc == crc, + ChecksumMismatch { + expected: crc, + actual: computed_crc + } + ); + + record_position.push(Position { + start: (pos - CRC_SIZE - RECORD_LENGTH_SIZE) as u64, + end: (pos + length as usize) as u64, + }); + // Move to the next record + pos += length as usize; + } + + self.is_open = true; + self.file = Some(file); + self.mmap = Some(mmap); + self.record_position = Some(record_position); + self.size = size; + Ok(()) + } + + pub fn close(&mut self) -> Result<()> { + self.is_open = false; + self.file.take(); + self.mmap.take(); + self.record_position.take(); + Ok(()) + } + + pub fn append(&mut self, data: &[u8]) -> Result<()> { + ensure!(self.is_open, SegmentNotOpen { id: self.id }); + ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull); Review Comment: Currently, reading and writing have only been implemented on a single segment file. Reading and writing across multiple segments have not yet been implemented. I’ll leave a TODO here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
