jiacai2050 commented on code in PR #1552:
URL: https://github.com/apache/horaedb/pull/1552#discussion_r1718105048


##########
src/wal/src/local_storage_impl/record_encoding.rs:
##########
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut};
+use codec::Encoder;
+use generic_error::GenericError;
+use macros::define_result;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+pub const RECORD_ENCODING_V0: u8 = 0;
+pub const NEWEST_RECORD_ENCODING_VERSION: u8 = RECORD_ENCODING_V0;
+
+pub const VERSION_SIZE: usize = 1;
+pub const CRC_SIZE: usize = 4;
+pub const RECORD_LENGTH_SIZE: usize = 4;
+pub const KEY_LENGTH_SIZE: usize = 4;
+pub const VALUE_LENGTH_SIZE: usize = 4;
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Version mismatch, expect:{}, given:{}", expected, given))]
+    Version { expected: u8, given: u8 },
+
+    #[snafu(display("Failed to encode, err:{}", source))]
+    Encoding { source: bytes_ext::Error },
+
+    #[snafu(display("Failed to decode, err:{}", source))]
+    Decoding { source: bytes_ext::Error },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+/// Record format:
+///
+/// ```text
+/// +---------+--------+--------+------------+-----+--------------+-------+
+/// | version |  crc   | length | key length | key | value length | value |
+/// |  (u8)   | (u32)  | (u32)  |   (u32)    |     |     (u32)    |       |
+/// +---------+--------+--------+------------+-----+--------------+-------+
+/// ```
+pub struct Record<'a> {
+    /// The version number of the record.
+    pub version: u8,
+
+    /// The CRC checksum of the record.
+    pub crc: u32,
+
+    /// The length of the record (excluding version, crc and length).
+    pub length: u32,
+
+    /// The length of the key in bytes.
+    pub key_length: u32,

Review Comment:
   Our key is fixed sized now(`table_id+seq_num`), so I think we don't need key 
length field.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,790 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::BytesMut;
+use codec::Encoder;
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    local_storage_impl::record_encoding::{Record, RecordEncoding},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {
+    /// The file path of the segment.
+    path: String,
+
+    /// A unique identifier for the segment.
+    id: u64,
+
+    /// The size of the segment in bytes.
+    size: u64,
+
+    /// The minimum sequence number of records within this segment.
+    min_seq: SequenceNumber,
+
+    /// The maximum sequence number of records within this segment.
+    max_seq: SequenceNumber,
+
+    /// The encoding format used for records within this segment.
+    record_encoding: RecordEncoding,
+
+    /// An optional file handle for the segment.
+    /// This may be `None` if the file is not currently open.
+    file: Option<File>,
+
+    /// An optional memory-mapped mutable buffer of the segment.
+    /// This may be `None` if the segment is not memory-mapped.
+    mmap: Option<MmapMut>,
+
+    /// An optional vector of positions within the segment.
+    /// This may be `None` if the segment is not memory-mapped.
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+/// Segment file format:
+///
+/// ```text
+/// +----------+--------+--------+---+--------+

Review Comment:
   Segment file also need to version field.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,790 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::BytesMut;
+use codec::Encoder;
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    local_storage_impl::record_encoding::{Record, RecordEncoding},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";

Review Comment:
   Usually we define it like this
   ```suggestion
   const SEGMENT_HEADER: &[u8] = b"HoraeDBWAL";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to