This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a288405 feat: Add a new disk-based WAL implementation for standalone 
deployment (#1552)
8a288405 is described below

commit 8a2884052b4c02d132fd2c30fa6169b84a9d2ffd
Author: Draco <[email protected]>
AuthorDate: Fri Aug 16 14:18:36 2024 +0800

    feat: Add a new disk-based WAL implementation for standalone deployment 
(#1552)
    
    ## Rationale
    
    #1279
    
    ## Detailed Changes
    
    1. Added a struct `Segment` responsible for reading and writing segment
    files, and it records the offset of each record.
    2. Add a struct SegmentManager responsible for managing all segments,
    including:
            1.      Reading all segments from the folder upon creation.
            2.      Writing only to the segment with the largest ID.
    3. Maintaining a cache where segments not in the cache are closed, while
    segments in the cache have their files open and are memory-mapped using
    mmap.
    3. Implement the `WalManager` trait.
    
    ## Test Plan
    
    Unit tests.
---
 Cargo.lock                                         |  19 +-
 integration_tests/Makefile                         |   2 +-
 src/analytic_engine/Cargo.toml                     |   1 +
 src/horaedb/Cargo.toml                             |   3 +-
 src/horaedb/src/setup.rs                           |  16 +
 src/wal/Cargo.toml                                 |   5 +-
 src/wal/src/config.rs                              |   7 +
 src/wal/src/lib.rs                                 |   2 +
 .../src/{lib.rs => local_storage_impl/config.rs}   |  30 +-
 src/wal/src/{lib.rs => local_storage_impl/mod.rs}  |  18 +-
 src/wal/src/local_storage_impl/record_encoding.rs  | 275 +++++++
 src/wal/src/local_storage_impl/segment.rs          | 900 +++++++++++++++++++++
 src/wal/src/local_storage_impl/wal_manager.rs      | 195 +++++
 src/wal/tests/read_write.rs                        |  24 +
 14 files changed, 1461 insertions(+), 36 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 04edee33..c904497a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1710,9 +1710,9 @@ dependencies = [
 
 [[package]]
 name = "crc32fast"
-version = "1.3.2"
+version = "1.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
+checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
 dependencies = [
  "cfg-if 1.0.0",
 ]
@@ -3932,6 +3932,15 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "memmap2"
+version = "0.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "memoffset"
 version = "0.6.5"
@@ -6892,7 +6901,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "167a4ffd7c35c143fd1030aa3c2caf76ba42220bd5a6b5f4781896434723b8c3"
 dependencies = [
  "debugid",
- "memmap2",
+ "memmap2 0.5.10",
  "stable_deref_trait",
  "uuid",
 ]
@@ -7729,7 +7738,7 @@ version = "1.6.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
 dependencies = [
- "cfg-if 0.1.10",
+ "cfg-if 1.0.0",
  "rand 0.8.5",
  "static_assertions",
 ]
@@ -7884,12 +7893,14 @@ dependencies = [
  "chrono",
  "codec",
  "common_types",
+ "crc32fast",
  "futures 0.3.28",
  "generic_error",
  "horaedbproto 2.0.0",
  "lazy_static",
  "logger",
  "macros",
+ "memmap2 0.9.4",
  "message_queue",
  "prometheus 0.12.0",
  "prost 0.11.8",
diff --git a/integration_tests/Makefile b/integration_tests/Makefile
index e5f94e03..e6ce21bd 100644
--- a/integration_tests/Makefile
+++ b/integration_tests/Makefile
@@ -54,7 +54,7 @@ build-meta:
        ./build_meta.sh
 
 build-horaedb:
-       cd .. && cargo build --bin horaedb-server --features 
wal-table-kv,wal-message-queue,wal-rocksdb
+       cd .. && cargo build --bin horaedb-server --features 
wal-table-kv,wal-message-queue,wal-rocksdb,wal-local-storage
 
 build-test:
        cargo build
diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml
index 1e02406a..8197b4ee 100644
--- a/src/analytic_engine/Cargo.toml
+++ b/src/analytic_engine/Cargo.toml
@@ -35,6 +35,7 @@ test = ["tempfile"]
 wal-table-kv = ["wal/wal-table-kv"]
 wal-message-queue = ["wal/wal-message-queue"]
 wal-rocksdb = ["wal/wal-rocksdb"]
+wal-local-storage = ["wal/wal-local-storage"]
 
 [dependencies]
 # In alphabetical order
diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml
index 38f22464..2abfa49e 100644
--- a/src/horaedb/Cargo.toml
+++ b/src/horaedb/Cargo.toml
@@ -31,10 +31,11 @@ workspace = true
 workspace = true
 
 [features]
-default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"]
+default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue", 
"wal-local-storage"]
 wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"]
 wal-message-queue = ["wal/wal-message-queue", 
"analytic_engine/wal-message-queue"]
 wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"]
+wal-local-storage = ["wal/wal-local-storage", 
"analytic_engine/wal-local-storage"]
 
 [dependencies]
 analytic_engine = { workspace = true }
diff --git a/src/horaedb/src/setup.rs b/src/horaedb/src/setup.rs
index 1cc0e2ab..9bdb46da 100644
--- a/src/horaedb/src/setup.rs
+++ b/src/horaedb/src/setup.rs
@@ -188,6 +188,22 @@ pub fn run_server(config: Config, log_runtime: 
RuntimeLevel) {
                     panic!("Message Queue WAL not bundled!");
                 }
             }
+            StorageConfig::Local(_) => {
+                #[cfg(feature = "wal-local-storage")]
+                {
+                    use 
wal::local_storage_impl::wal_manager::LocalStorageWalsOpener;
+                    run_server_with_runtimes::<LocalStorageWalsOpener>(
+                        config,
+                        engine_runtimes,
+                        log_runtime,
+                    )
+                    .await;
+                }
+                #[cfg(not(feature = "wal-local-storage"))]
+                {
+                    panic!("Local Storage WAL not bundled!");
+                }
+            }
         }
     });
 }
diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml
index 6c8be8c1..0d13ef36 100644
--- a/src/wal/Cargo.toml
+++ b/src/wal/Cargo.toml
@@ -40,10 +40,11 @@ optional = true
 wal-message-queue = ["dep:message_queue"]
 wal-table-kv = ["dep:table_kv"]
 wal-rocksdb = ["dep:rocksdb"]
+wal-local-storage = ["memmap2", "crc32fast"]
 
 [[test]]
 name = "read_write"
-required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb"]
+required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb", 
"wal-local-storage"]
 
 [dependencies]
 async-trait = { workspace = true }
@@ -51,12 +52,14 @@ bytes_ext = { workspace = true }
 chrono = { workspace = true }
 codec = { workspace = true }
 common_types = { workspace = true }
+crc32fast = { version = "1.4.2", optional = true }
 futures = { workspace = true, features = ["async-await"], optional = true }
 generic_error = { workspace = true }
 horaedbproto = { workspace = true }
 lazy_static = { workspace = true }
 logger = { workspace = true }
 macros = { workspace = true }
+memmap2 = { version = "0.9.4", optional = true }
 message_queue = { workspace = true, optional = true }
 prometheus = { workspace = true }
 prost = { workspace = true }
diff --git a/src/wal/src/config.rs b/src/wal/src/config.rs
index 8a13e00c..3169a7cb 100644
--- a/src/wal/src/config.rs
+++ b/src/wal/src/config.rs
@@ -35,6 +35,12 @@ pub type KafkaStorageConfig = 
crate::message_queue_impl::config::KafkaStorageCon
 #[derive(Debug, Default, Clone, Deserialize, Serialize)]
 pub struct KafkaStorageConfig;
 
+#[cfg(feature = "wal-local-storage")]
+pub type LocalStorageConfig = 
crate::local_storage_impl::config::LocalStorageConfig;
+#[cfg(not(feature = "wal-local-storage"))]
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct LocalStorageConfig;
+
 #[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct Config {
     // The flatten attribute inlines keys from a field into the parent struct.
@@ -63,4 +69,5 @@ pub enum StorageConfig {
     RocksDB(Box<RocksDBStorageConfig>),
     Obkv(Box<ObkvStorageConfig>),
     Kafka(Box<KafkaStorageConfig>),
+    Local(Box<LocalStorageConfig>),
 }
diff --git a/src/wal/src/lib.rs b/src/wal/src/lib.rs
index a03bdab1..c5731178 100644
--- a/src/wal/src/lib.rs
+++ b/src/wal/src/lib.rs
@@ -22,6 +22,8 @@
 pub mod config;
 mod dummy;
 pub mod kv_encoder;
+#[cfg(feature = "wal-local-storage")]
+pub mod local_storage_impl;
 pub mod log_batch;
 pub mod manager;
 #[cfg(feature = "wal-message-queue")]
diff --git a/src/wal/src/lib.rs b/src/wal/src/local_storage_impl/config.rs
similarity index 65%
copy from src/wal/src/lib.rs
copy to src/wal/src/local_storage_impl/config.rs
index a03bdab1..59a0c81b 100644
--- a/src/wal/src/lib.rs
+++ b/src/wal/src/local_storage_impl/config.rs
@@ -15,19 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Write Ahead Log
+use serde::{Deserialize, Serialize};
 
-#![feature(trait_alias)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalStorageConfig {
+    pub path: String,
+    pub max_segment_size: u64,
+    pub cache_size: usize,
+}
 
-pub mod config;
-mod dummy;
-pub mod kv_encoder;
-pub mod log_batch;
-pub mod manager;
-#[cfg(feature = "wal-message-queue")]
-pub mod message_queue_impl;
-pub(crate) mod metrics;
-#[cfg(feature = "wal-rocksdb")]
-pub mod rocksdb_impl;
-#[cfg(feature = "wal-table-kv")]
-pub mod table_kv_impl;
+impl Default for LocalStorageConfig {
+    fn default() -> Self {
+        Self {
+            path: "/tmp/horaedb".to_string(),
+            max_segment_size: 64 * 1024 * 1024, // 64MB
+            cache_size: 3,
+        }
+    }
+}
diff --git a/src/wal/src/lib.rs b/src/wal/src/local_storage_impl/mod.rs
similarity index 72%
copy from src/wal/src/lib.rs
copy to src/wal/src/local_storage_impl/mod.rs
index a03bdab1..7bb5bf63 100644
--- a/src/wal/src/lib.rs
+++ b/src/wal/src/local_storage_impl/mod.rs
@@ -15,19 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Write Ahead Log
-
-#![feature(trait_alias)]
-
 pub mod config;
-mod dummy;
-pub mod kv_encoder;
-pub mod log_batch;
-pub mod manager;
-#[cfg(feature = "wal-message-queue")]
-pub mod message_queue_impl;
-pub(crate) mod metrics;
-#[cfg(feature = "wal-rocksdb")]
-pub mod rocksdb_impl;
-#[cfg(feature = "wal-table-kv")]
-pub mod table_kv_impl;
+mod record_encoding;
+mod segment;
+pub mod wal_manager;
diff --git a/src/wal/src/local_storage_impl/record_encoding.rs 
b/src/wal/src/local_storage_impl/record_encoding.rs
new file mode 100644
index 00000000..f7c55b29
--- /dev/null
+++ b/src/wal/src/local_storage_impl/record_encoding.rs
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut};
+use codec::Encoder;
+use generic_error::GenericError;
+use macros::define_result;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+pub const RECORD_ENCODING_V0: u8 = 0;
+pub const NEWEST_RECORD_ENCODING_VERSION: u8 = RECORD_ENCODING_V0;
+
+pub const VERSION_SIZE: usize = 1;
+pub const CRC_SIZE: usize = 4;
+pub const RECORD_LENGTH_SIZE: usize = 4;
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Version mismatch, expect:{}, given:{}", expected, given))]
+    Version { expected: u8, given: u8 },
+
+    #[snafu(display("Failed to encode, err:{}", source))]
+    Encoding { source: bytes_ext::Error },
+
+    #[snafu(display("Failed to decode, err:{}", source))]
+    Decoding { source: bytes_ext::Error },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+/// Record format:
+///
+/// ```text
+/// 
+---------+--------+--------+------------+--------------+--------------+-------+
+/// | version |  crc   | length |  table id  | sequence num | value length | 
value |
+/// |  (u8)   | (u32)  | (u32)  |   (u64)    |    (u64)     |     (u32)    |   
    |
+/// 
+---------+--------+--------+------------+--------------+--------------+-------+
+/// ```
+pub struct Record<'a> {
+    /// The version number of the record.
+    pub version: u8,
+
+    /// The CRC checksum of the record.
+    pub crc: u32,
+
+    /// The length of the record (excluding version, crc and length).
+    pub length: u32,
+
+    /// Identifier for tables.
+    pub table_id: u64,
+
+    /// Identifier for records, incrementing.
+    pub sequence_num: u64,
+
+    /// The length of the value in bytes.
+    pub value_length: u32,
+
+    /// Common log value.
+    pub value: &'a [u8],
+}
+
+impl<'a> Record<'a> {
+    pub fn new(table_id: u64, sequence_num: u64, value: &'a [u8]) -> 
Result<Self> {
+        let mut record = Record {
+            version: NEWEST_RECORD_ENCODING_VERSION,
+            crc: 0,
+            length: (8 + 8 + 4 + value.len()) as u32,
+            table_id,
+            sequence_num,
+            value_length: value.len() as u32,
+            value,
+        };
+
+        // Calculate CRC
+        let mut buf = Vec::new();
+        buf.try_put_u64(table_id).context(Encoding)?;
+        buf.try_put_u64(sequence_num).context(Encoding)?;
+        buf.try_put_u32(record.value_length).context(Encoding)?;
+        buf.extend_from_slice(value);
+        record.crc = crc32fast::hash(&buf);
+
+        Ok(record)
+    }
+
+    // Return the length of the record
+    pub fn len(&self) -> usize {
+        VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE + self.length as usize
+    }
+}
+
+#[derive(Clone, Debug)]
+pub struct RecordEncoding {
+    version: u8,
+}
+
+impl RecordEncoding {
+    pub fn newest() -> Self {
+        Self {
+            version: NEWEST_RECORD_ENCODING_VERSION,
+        }
+    }
+}
+
+impl Encoder<Record<'_>> for RecordEncoding {
+    type Error = Error;
+
+    fn encode<B: BufMut>(&self, buf: &mut B, record: &Record) -> Result<()> {
+        // Verify version
+        ensure!(
+            record.version == self.version,
+            Version {
+                expected: self.version,
+                given: record.version
+            }
+        );
+
+        buf.try_put_u8(record.version).context(Encoding)?;
+        buf.try_put_u32(record.crc).context(Encoding)?;
+        buf.try_put_u32(record.length).context(Encoding)?;
+        buf.try_put_u64(record.table_id).context(Encoding)?;
+        buf.try_put_u64(record.sequence_num).context(Encoding)?;
+        buf.try_put_u32(record.value_length).context(Encoding)?;
+        buf.try_put(record.value).context(Encoding)?;
+        Ok(())
+    }
+
+    fn estimate_encoded_size(&self, record: &Record) -> usize {
+        record.len()
+    }
+}
+
+impl RecordEncoding {
+    pub fn decode<'a>(&'a self, mut buf: &'a [u8]) -> Result<Record> {
+        // Ensure that buf is not shorter than the shortest record.
+        ensure!(
+            buf.remaining() >= VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+            LengthMismatch {
+                expected: VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+                actual: buf.remaining()
+            }
+        );
+
+        // Read version
+        let version = buf.try_get_u8().context(Decoding)?;
+
+        // Verify version
+        ensure!(
+            version == self.version,
+            Version {
+                expected: self.version,
+                given: version
+            }
+        );
+
+        // Read CRC
+        let crc = buf.try_get_u32().context(Decoding)?;
+
+        // Read length
+        let length = buf.try_get_u32().context(Decoding)?;
+
+        // Ensure the buf is long enough
+        ensure!(
+            buf.remaining() >= length as usize,
+            LengthMismatch {
+                expected: length as usize,
+                actual: buf.remaining()
+            }
+        );
+
+        // Verify CRC
+        let data = &buf[0..length as usize];
+        let computed_crc = crc32fast::hash(data);
+        ensure!(
+            computed_crc == crc,
+            ChecksumMismatch {
+                expected: crc,
+                actual: computed_crc
+            }
+        );
+
+        // Read table id
+        let table_id = buf.try_get_u64().context(Decoding)?;
+
+        // Read sequence number
+        let sequence_num = buf.try_get_u64().context(Decoding)?;
+
+        // Read value length
+        let value_length = buf.try_get_u32().context(Decoding)?;
+
+        // Read value
+        let value = &buf[0..value_length as usize];
+        buf.advance(value_length as usize);
+
+        Ok(Record {
+            version,
+            crc,
+            length,
+            table_id,
+            sequence_num,
+            value_length,
+            value,
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use bytes_ext::BytesMut;
+    use codec::Encoder;
+
+    use crate::local_storage_impl::record_encoding::{Record, RecordEncoding};
+
+    #[test]
+    fn test_record_encoding() {
+        let table_id = 1;
+        let sequence_num = 2;
+        let value = b"test_value";
+        let record = Record::new(table_id, sequence_num, value).unwrap();
+
+        let encoder = RecordEncoding::newest();
+        let mut buf = BytesMut::new();
+        encoder.encode(&mut buf, &record).unwrap();
+
+        let expected_len = record.len();
+        assert_eq!(buf.len(), expected_len);
+    }
+
+    #[test]
+    fn test_record_decoding() {
+        let table_id = 1;
+        let sequence_num = 2;
+        let value = b"test_value";
+        let record = Record::new(table_id, sequence_num, value).unwrap();
+
+        let encoder = RecordEncoding::newest();
+        let mut buf = BytesMut::new();
+        encoder.encode(&mut buf, &record).unwrap();
+
+        let decoded_record = encoder.decode(&buf).unwrap();
+
+        assert_eq!(decoded_record.version, record.version);
+        assert_eq!(decoded_record.crc, record.crc);
+        assert_eq!(decoded_record.length, record.length);
+        assert_eq!(decoded_record.table_id, record.table_id);
+        assert_eq!(decoded_record.sequence_num, record.sequence_num);
+        assert_eq!(decoded_record.value_length, record.value_length);
+        assert_eq!(decoded_record.value, record.value);
+    }
+}
diff --git a/src/wal/src/local_storage_impl/segment.rs 
b/src/wal/src/local_storage_impl/segment.rs
new file mode 100644
index 00000000..f44ea131
--- /dev/null
+++ b/src/wal/src/local_storage_impl/segment.rs
@@ -0,0 +1,900 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use codec::Encoder;
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::CommonLogEncoding,
+    local_storage_impl::record_encoding::{Record, RecordEncoding},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to open or create dir: {}", source))]
+    DirOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const SEGMENT_HEADER: &[u8] = b"HoraeDBWAL";
+const WAL_SEGMENT_V0: u8 = 0;
+const NEWEST_WAL_SEGMENT_VERSION: u8 = WAL_SEGMENT_V0;
+const VERSION_SIZE: usize = 1;
+
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+/// Segment file format:
+///
+/// ```text
+/// +-------------+--------+--------+--------+---+--------+
+/// | Version(u8) | Header | Record | Record |...| Record |
+/// +-------------+--------+--------+--------+---+--------+
+/// ```
+///
+/// The `Header` is a fixed string. The format of the `Record` can be 
referenced
+/// in the struct `Record`.
+#[derive(Debug)]
+pub struct Segment {
+    /// The version of the segment.
+    version: u8,
+
+    /// The file path of the segment.
+    path: String,
+
+    /// A unique identifier for the segment.
+    id: u64,
+
+    /// The size of the segment in bytes.
+    size: u64,
+
+    /// The minimum sequence number of records within this segment.
+    min_seq: SequenceNumber,
+
+    /// The maximum sequence number of records within this segment.
+    max_seq: SequenceNumber,
+
+    /// The encoding format used for records within this segment.
+    record_encoding: RecordEncoding,
+
+    /// An optional file handle for the segment.
+    /// This may be `None` if the file is not currently open.
+    file: Option<File>,
+
+    /// An optional memory-mapped mutable buffer of the segment.
+    /// This may be `None` if the segment is not memory-mapped.
+    mmap: Option<MmapMut>,
+
+    /// An optional vector of positions within the segment.
+    /// This may be `None` if the segment is not memory-mapped.
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+impl Segment {
+    pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+        if !Path::new(&path).exists() {
+            let mut file = File::create(&path).context(FileOpen)?;
+            file.write_all(&[NEWEST_WAL_SEGMENT_VERSION])
+                .context(FileOpen)?;
+            file.write_all(SEGMENT_HEADER).context(FileOpen)?;
+        }
+        Ok(Segment {
+            version: NEWEST_WAL_SEGMENT_VERSION,
+            path,
+            id: segment_id,
+            size: SEGMENT_HEADER.len() as u64,
+            min_seq: MAX_SEQUENCE_NUMBER,
+            max_seq: MIN_SEQUENCE_NUMBER,
+            record_encoding: RecordEncoding::newest(),
+            file: None,
+            mmap: None,
+            record_position: None,
+        })
+    }
+
+    pub fn open(&mut self) -> Result<()> {
+        // Open the segment file
+        let file = OpenOptions::new()
+            .read(true)
+            .append(true)
+            .open(&self.path)
+            .context(FileOpen)?;
+
+        let metadata = file.metadata().context(FileOpen)?;
+        let size = metadata.len();
+
+        // Map the file in memory
+        let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+        // Validate segment version
+        let version = mmap[0];
+        ensure!(version == self.version, InvalidHeader);
+
+        // Validate segment header
+        let header_len = SEGMENT_HEADER.len();
+        ensure!(size >= header_len as u64, InvalidHeader);
+        let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len];
+        ensure!(header == SEGMENT_HEADER, InvalidHeader);
+
+        // Read and validate all records
+        let mut pos = VERSION_SIZE + header_len;
+        let mut record_position = Vec::new();
+        while pos < size as usize {
+            let data = &mmap[pos..];
+
+            let record = self
+                .record_encoding
+                .decode(data)
+                .box_err()
+                .context(InvalidRecord)?;
+
+            record_position.push(Position {
+                start: pos as u64,
+                end: (pos + record.len()) as u64,
+            });
+
+            // Move to the next record
+            pos += record.len();
+        }
+
+        self.file = Some(file);
+        self.mmap = Some(mmap);
+        self.record_position = Some(record_position);
+        self.size = size;
+        Ok(())
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.file.take();
+        self.mmap.take();
+        self.record_position.take();
+        Ok(())
+    }
+
+    /// Append a slice to the segment file.
+    pub fn append(&mut self, data: &[u8]) -> Result<()> {
+        ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull);
+
+        // Ensure the segment file is open
+        let Some(file) = &mut self.file else {
+            return SegmentNotOpen { id: self.id }.fail();
+        };
+
+        // Append to the file
+        file.write_all(data).context(SegmentAppend)?;
+        file.flush().context(Flush)?;
+
+        // Remap
+        // todo: Do not remap every time you append; instead, create a large 
enough file
+        // at the beginning.
+        let mmap = unsafe { MmapOptions::new().map_mut(&*file).context(Mmap)? 
};
+        self.mmap = Some(mmap);
+        self.size += data.len() as u64;
+
+        Ok(())
+    }
+
+    pub fn read(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
+        // Ensure that the reading range is within the file
+        ensure!(
+            offset + size <= self.size,
+            LengthMismatch {
+                expected: (offset + size) as usize,
+                actual: self.size as usize
+            }
+        );
+
+        let start = offset as usize;
+        let end = start + size as usize;
+        match &self.mmap {
+            Some(mmap) => Ok(mmap[start..end].to_vec()),
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn append_record_position(&mut self, pos: &mut Vec<Position>) -> 
Result<()> {
+        match self.record_position.as_mut() {
+            Some(record_position) => {
+                record_position.append(pos);
+                Ok(())
+            }
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn update_seq(&mut self, min_seq: u64, max_seq: u64) -> Result<()> {
+        if min_seq < self.min_seq {
+            self.min_seq = min_seq;
+        }
+        if max_seq > self.max_seq {
+            self.max_seq = max_seq;
+        }
+        Ok(())
+    }
+}
+
+pub struct Region {
+    /// Identifier for regions.
+    _region_id: u64,
+
+    /// All segments protected by a mutex
+    /// todo: maybe use a RWLock?
+    all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
+
+    /// Cache for opened segments
+    cache: Mutex<VecDeque<u64>>,
+
+    /// Maximum size of the cache
+    cache_size: usize,
+
+    /// Directory for segment storage
+    _segment_dir: String,
+
+    /// Index of the latest segment for appending logs
+    current: Mutex<u64>,
+
+    /// Encoding method for logs
+    log_encoding: CommonLogEncoding,
+
+    /// Encoding method for records
+    record_encoding: RecordEncoding,
+
+    /// Sequence number for the next log
+    next_sequence_num: AtomicU64,
+
+    /// Runtime for handling write requests
+    runtime: Arc<Runtime>,
+}
+
+impl Region {
+    pub fn new(
+        region_id: u64,
+        cache_size: usize,
+        segment_dir: String,
+        runtime: Arc<Runtime>,
+    ) -> Result<Self> {
+        let mut all_segments = HashMap::new();
+
+        // Scan the directory for existing WAL files
+        let mut max_segment_id: i32 = -1;
+
+        // Segment file naming convention: segment_<id>.wal
+        for entry in fs::read_dir(&segment_dir).context(FileOpen)? {
+            let entry = entry.context(FileOpen)?;
+
+            let path = entry.path();
+
+            if !path.is_file() {
+                continue;
+            }
+
+            match path.extension() {
+                Some(ext) if ext == "wal" => ext,
+                _ => continue,
+            };
+
+            let file_name = match path.file_name().and_then(|name| 
name.to_str()) {
+                Some(name) => name,
+                None => continue,
+            };
+
+            let segment_id = match file_name
+                .trim_start_matches("segment_")
+                .trim_end_matches(".wal")
+                .parse::<u64>()
+                .ok()
+            {
+                Some(id) => id,
+                None => continue,
+            };
+
+            let segment = Segment::new(path.to_string_lossy().to_string(), 
segment_id)?;
+            let segment = Arc::new(Mutex::new(segment));
+
+            if segment_id as i32 > max_segment_id {
+                max_segment_id = segment_id as i32;
+            }
+            all_segments.insert(segment_id, segment);
+        }
+
+        // If no existing segments, create a new one
+        if max_segment_id == -1 {
+            max_segment_id = 0;
+            let path = format!("{}/segment_{}.wal", segment_dir, 
max_segment_id);
+            let new_segment = Segment::new(path, max_segment_id as u64)?;
+            let new_segment = Arc::new(Mutex::new(new_segment));
+            all_segments.insert(0, new_segment);
+        }
+
+        Ok(Self {
+            _region_id: region_id,
+            all_segments: Mutex::new(all_segments),
+            cache: Mutex::new(VecDeque::new()),
+            cache_size,
+            _segment_dir: segment_dir,
+            current: Mutex::new(max_segment_id as u64),
+            log_encoding: CommonLogEncoding::newest(),
+            record_encoding: RecordEncoding::newest(),
+            // todo: do not use MIN_SEQUENCE_NUMBER, read from the latest 
record
+            next_sequence_num: AtomicU64::new(MIN_SEQUENCE_NUMBER + 1),
+            runtime,
+        })
+    }
+
+    /// Obtain the target segment. If it is not open, then open it and put it 
to
+    /// the cache.
+    fn get_segment(&self, segment_id: u64) -> Result<Arc<Mutex<Segment>>> {
+        let mut cache = self.cache.lock().unwrap();
+        let all_segments = self.all_segments.lock().unwrap();
+
+        let segment = all_segments.get(&segment_id);
+
+        let segment = match segment {
+            Some(segment) => segment,
+            None => return SegmentNotFound { id: segment_id }.fail(),
+        };
+
+        // Check if segment is already in cache
+        if cache.iter().any(|id| *id == segment_id) {
+            let segment = all_segments.get(&segment_id);
+            return match segment {
+                Some(segment) => Ok(segment.clone()),
+                None => SegmentNotFound { id: segment_id }.fail(),
+            };
+        }
+
+        // If not in cache, load from disk
+        segment.lock().unwrap().open()?;
+
+        // Add to cache
+        if cache.len() == self.cache_size {
+            let evicted_segment_id = cache.pop_front();
+            // TODO: if the evicted segment is being read or written, wait for 
it to finish
+            if let Some(evicted_segment_id) = evicted_segment_id {
+                let evicted_segment = all_segments.get(&evicted_segment_id);
+                if let Some(evicted_segment) = evicted_segment {
+                    evicted_segment.lock().unwrap().close()?;
+                } else {
+                    return SegmentNotFound {
+                        id: evicted_segment_id,
+                    }
+                    .fail();
+                }
+            }
+        }
+        cache.push_back(segment_id);
+
+        Ok(segment.clone())
+    }
+
+    pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> 
Result<SequenceNumber> {
+        // Lock
+        let current = self.current.lock().unwrap();
+        let segment = self.get_segment(*current)?;
+        let mut segment = segment.lock().unwrap();
+
+        let entries_num = batch.len() as u64;
+        let table_id = batch.location.table_id;
+
+        // Allocate sequence number
+        let prev_sequence_num = self.alloc_sequence_num(entries_num);
+        let mut next_sequence_num = prev_sequence_num;
+
+        let mut data = Vec::new();
+        let mut record_position = Vec::new();
+
+        for entry in &batch.entries {
+            // Encode the record
+            let record = Record::new(table_id, next_sequence_num, 
&entry.payload)
+                .box_err()
+                .context(Encoding)?;
+            self.record_encoding
+                .encode(&mut data, &record)
+                .box_err()
+                .context(Encoding)?;
+
+            record_position.push(Position {
+                start: (data.len() - record.len()) as u64,
+                end: data.len() as u64,
+            });
+
+            next_sequence_num += 1;
+        }
+
+        // TODO: spawn a new task to write to segment
+        // TODO: maybe need a write mutex?
+
+        for pos in record_position.iter_mut() {
+            pos.start += segment.size;
+            pos.end += segment.size;
+        }
+
+        // Update the record position
+        segment.append_record_position(&mut record_position)?;
+
+        // Update the min and max sequence numbers
+        segment.update_seq(prev_sequence_num, next_sequence_num - 1)?;
+
+        // Append logs to segment file
+        segment.append(&data)?;
+        Ok(next_sequence_num - 1)
+    }
+
+    pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) -> 
Result<BatchLogIteratorAdapter> {
+        // Check read range's validity.
+        let start = if let Some(start) = req.start.as_start_sequence_number() {
+            start
+        } else {
+            MAX_SEQUENCE_NUMBER
+        };
+        let end = if let Some(end) = req.end.as_end_sequence_number() {
+            end
+        } else {
+            MIN_SEQUENCE_NUMBER
+        };
+        if start > end {
+            return Ok(BatchLogIteratorAdapter::empty());
+        }
+        let iter = SegmentLogIterator::new(
+            self.log_encoding.clone(),
+            self.record_encoding.clone(),
+            self.get_segment(0)?,
+            Some(req.location.table_id),
+            start,
+            end,
+        );
+        Ok(BatchLogIteratorAdapter::new_with_sync(
+            Box::new(iter),
+            self.runtime.clone(),
+            ctx.batch_size,
+        ))
+    }
+
+    pub fn scan(&self, ctx: &ScanContext, _req: &ScanRequest) -> 
Result<BatchLogIteratorAdapter> {
+        let iter = SegmentLogIterator::new(
+            self.log_encoding.clone(),
+            self.record_encoding.clone(),
+            self.get_segment(0)?,
+            None,
+            MIN_SEQUENCE_NUMBER,
+            MAX_SEQUENCE_NUMBER,
+        );
+        Ok(BatchLogIteratorAdapter::new_with_sync(
+            Box::new(iter),
+            self.runtime.clone(),
+            ctx.batch_size,
+        ))
+    }
+
+    pub fn mark_delete_entries_up_to(
+        &self,
+        _location: WalLocation,
+        _sequence_num: SequenceNumber,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    #[inline]
+    fn alloc_sequence_num(&self, number: u64) -> SequenceNumber {
+        self.next_sequence_num.fetch_add(number, Ordering::Relaxed)
+    }
+
+    pub fn sequence_num(&self, _location: WalLocation) -> 
Result<SequenceNumber> {
+        let next_seq_num = self.next_sequence_num.load(Ordering::Relaxed);
+        debug_assert!(next_seq_num > 0);
+        Ok(next_seq_num - 1)
+    }
+}
+
+pub struct RegionManager {
+    root_dir: String,
+    regions: Mutex<HashMap<u64, Arc<Region>>>,
+    cache_size: usize,
+    runtime: Arc<Runtime>,
+}
+
+impl RegionManager {
+    // Create a RegionManager, and scans all the region folders located under
+    // root_dir.
+    pub fn new(root_dir: String, cache_size: usize, runtime: Arc<Runtime>) -> 
Result<Self> {
+        let mut regions = HashMap::new();
+
+        // Naming conversion: <root_dir>/<region_id>
+        for entry in fs::read_dir(&root_dir).context(DirOpen)? {
+            let entry = entry.context(DirOpen)?;
+
+            let path = entry.path();
+            if path.is_file() {
+                continue;
+            }
+
+            let dir_name = match path.file_name().and_then(|name| 
name.to_str()) {
+                Some(name) => name,
+                None => continue,
+            };
+
+            // Parse region id from dir name
+            let region_id = match dir_name.parse::<u64>().ok() {
+                Some(id) => id,
+                None => continue,
+            };
+
+            let region = Region::new(
+                region_id,
+                cache_size,
+                path.to_string_lossy().to_string(),
+                runtime.clone(),
+            )?;
+            regions.insert(region_id, Arc::new(region));
+        }
+
+        Ok(Self {
+            root_dir,
+            regions: Mutex::new(regions),
+            cache_size,
+            runtime,
+        })
+    }
+
+    /// Retrieve a region by its `region_id`. If the region does not exist,
+    /// create a new one.
+    fn get_region(&self, region_id: RegionId) -> Result<Arc<Region>> {
+        let mut regions = self.regions.lock().unwrap();
+        if let Some(region) = regions.get(&region_id) {
+            return Ok(region.clone());
+        }
+
+        let region_dir = Path::new(&self.root_dir).join(region_id.to_string());
+        fs::create_dir_all(&region_dir).context(DirOpen)?;
+
+        let region = Region::new(
+            region_id,
+            self.cache_size,
+            region_dir.to_string_lossy().to_string(),
+            self.runtime.clone(),
+        )?;
+
+        Ok(regions.entry(region_id).or_insert(Arc::new(region)).clone())
+    }
+
+    pub fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> 
Result<SequenceNumber> {
+        let region = self.get_region(batch.location.region_id)?;
+        region.write(ctx, batch)
+    }
+
+    pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) -> 
Result<BatchLogIteratorAdapter> {
+        let region = self.get_region(req.location.region_id)?;
+        region.read(ctx, req)
+    }
+
+    pub fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> 
Result<BatchLogIteratorAdapter> {
+        let region = self.get_region(req.region_id)?;
+        region.scan(ctx, req)
+    }
+
+    pub fn mark_delete_entries_up_to(
+        &self,
+        location: WalLocation,
+        sequence_num: SequenceNumber,
+    ) -> Result<()> {
+        let region = self.get_region(location.region_id)?;
+        region.mark_delete_entries_up_to(location, sequence_num)
+    }
+
+    pub fn sequence_num(&self, location: WalLocation) -> 
Result<SequenceNumber> {
+        let region = self.get_region(location.region_id)?;
+        region.sequence_num(location)
+    }
+}
+
+// TODO: handle the case when read requests involving multiple segments
+#[derive(Debug)]
+pub struct SegmentLogIterator {
+    /// Encoding method for common log.
+    log_encoding: CommonLogEncoding,
+
+    /// Encoding method for records.
+    record_encoding: RecordEncoding,
+
+    /// Thread-safe, shared reference to the log segment.
+    segment: Arc<Mutex<Segment>>,
+
+    /// Optional identifier for the table, which is used to filter logs.
+    table_id: Option<TableId>,
+
+    /// Starting sequence number for log iteration.
+    start: SequenceNumber,
+
+    /// Ending sequence number for log iteration.
+    end: SequenceNumber,
+
+    /// Index of the current record within the segment.
+    current_record_idx: usize,
+
+    /// The raw payload data of the current record.
+    current_payload: Vec<u8>,
+
+    /// Flag indicating whether there is no more data to read.
+    no_more_data: bool,
+}
+
+impl SegmentLogIterator {
+    pub fn new(
+        log_encoding: CommonLogEncoding,
+        record_encoding: RecordEncoding,
+        segment: Arc<Mutex<Segment>>,
+        table_id: Option<TableId>,
+        start: SequenceNumber,
+        end: SequenceNumber,
+    ) -> Self {
+        SegmentLogIterator {
+            log_encoding,
+            record_encoding,
+            segment,
+            table_id,
+            start,
+            end,
+            current_record_idx: 0,
+            current_payload: Vec::new(),
+            no_more_data: false,
+        }
+    }
+
+    fn next(&mut self) -> Result<Option<LogEntry<&'_ [u8]>>> {
+        if self.no_more_data {
+            return Ok(None);
+        }
+
+        // todo: ensure that this segment is not evicted from the cache during 
the read
+        // process, or that it is reloaded into the cache as needed
+        let segment = self.segment.lock().unwrap();
+
+        loop {
+            let Some(record_position) = &segment.record_position else {
+                self.no_more_data = true;
+                return Ok(None);
+            };
+            let Some(pos) = record_position.get(self.current_record_idx) else {
+                self.no_more_data = true;
+                return Ok(None);
+            };
+
+            self.current_record_idx += 1;
+            let record = segment.read(pos.start, pos.end - pos.start)?;
+
+            // Decode record
+            let record = self
+                .record_encoding
+                .decode(record.as_slice())
+                .box_err()
+                .context(InvalidRecord)?;
+
+            // Filter by sequence number
+            if record.sequence_num < self.start {
+                continue;
+            }
+            if record.sequence_num > self.end {
+                self.no_more_data = true;
+                return Ok(None);
+            }
+
+            // Filter by table_id
+            if let Some(table_id) = self.table_id {
+                if record.table_id != table_id {
+                    continue;
+                }
+            }
+
+            // Decode value
+            let value = self
+                .log_encoding
+                .decode_value(record.value)
+                .box_err()
+                .context(InvalidRecord)?;
+
+            self.current_payload = value.to_owned();
+
+            return Ok(Some(LogEntry {
+                table_id: record.table_id,
+                sequence: record.sequence_num,
+                payload: self.current_payload.as_slice(),
+            }));
+        }
+    }
+}
+
+impl SyncLogIterator for SegmentLogIterator {
+    fn next_log_entry(&mut self) -> crate::manager::Result<Option<LogEntry<&'_ 
[u8]>>> {
+        self.next().box_err().context(Read)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use runtime::Builder;
+    use tempfile::tempdir;
+
+    use super::*;
+
+    #[test]
+    fn test_segment_creation() {
+        let dir = tempdir().unwrap();
+        let path = dir
+            .path()
+            .join("segment_0.wal")
+            .to_str()
+            .unwrap()
+            .to_string();
+
+        let segment = Segment::new(path.clone(), 0);
+        assert!(segment.is_ok());
+
+        let segment = segment.unwrap();
+        assert_eq!(segment.version, NEWEST_WAL_SEGMENT_VERSION);
+        assert_eq!(segment.path, path);
+        assert_eq!(segment.id, 0);
+        assert_eq!(segment.size, SEGMENT_HEADER.len() as u64);
+
+        let segment_content = fs::read(path).unwrap();
+        assert_eq!(segment_content[0], NEWEST_WAL_SEGMENT_VERSION);
+        assert_eq!(
+            &segment_content[VERSION_SIZE..VERSION_SIZE + 
SEGMENT_HEADER.len()],
+            SEGMENT_HEADER
+        );
+    }
+
+    #[test]
+    fn test_segment_open() {
+        let dir = tempdir().unwrap();
+        let path = dir
+            .path()
+            .join("segment_0.wal")
+            .to_str()
+            .unwrap()
+            .to_string();
+        let mut segment = Segment::new(path.clone(), 0).unwrap();
+
+        let result = segment.open();
+        assert!(result.is_ok());
+    }
+
+    #[test]
+    fn test_segment_append_and_read() {
+        let dir = tempdir().unwrap();
+        let path = dir
+            .path()
+            .join("segment_0.wal")
+            .to_str()
+            .unwrap()
+            .to_string();
+        let mut segment = Segment::new(path.clone(), 0).unwrap();
+        segment.open().unwrap();
+
+        let data = b"test_data";
+        let append_result = segment.append(data);
+        assert!(append_result.is_ok());
+
+        let read_result = segment.read(
+            (VERSION_SIZE + SEGMENT_HEADER.len()) as u64,
+            data.len() as u64,
+        );
+        assert!(read_result.is_ok());
+        assert_eq!(read_result.unwrap(), data);
+    }
+
+    #[test]
+    fn test_region_creation() {
+        let dir = tempdir().unwrap();
+        let runtime = Arc::new(Builder::default().build().unwrap());
+
+        let segment_manager = Region::new(1, 1, 
dir.path().to_str().unwrap().to_string(), runtime);
+        assert!(segment_manager.is_ok());
+
+        let segment_manager = segment_manager.unwrap();
+        let segment = segment_manager.get_segment(0);
+        assert!(segment.is_ok());
+    }
+}
diff --git a/src/wal/src/local_storage_impl/wal_manager.rs 
b/src/wal/src/local_storage_impl/wal_manager.rs
new file mode 100644
index 00000000..d05e90d8
--- /dev/null
+++ b/src/wal/src/local_storage_impl/wal_manager.rs
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    fmt,
+    fmt::{Debug, Formatter},
+    path::{Path, PathBuf},
+    sync::Arc,
+};
+
+use async_trait::async_trait;
+use common_types::SequenceNumber;
+use generic_error::BoxError;
+use logger::{debug, info};
+use runtime::Runtime;
+use snafu::ResultExt;
+
+use crate::{
+    config::{Config, StorageConfig},
+    local_storage_impl::{config::LocalStorageConfig, segment::RegionManager},
+    log_batch::LogWriteBatch,
+    manager::{
+        error::*, BatchLogIteratorAdapter, Open, OpenedWals, ReadContext, 
ReadRequest, RegionId,
+        ScanContext, ScanRequest, WalLocation, WalManager, WalManagerRef, 
WalRuntimes, WalsOpener,
+        WriteContext, MANIFEST_DIR_NAME, WAL_DIR_NAME,
+    },
+};
+
+pub struct LocalStorageImpl {
+    config: LocalStorageConfig,
+    _runtime: Arc<Runtime>,
+    segment_manager: RegionManager,
+}
+
+impl LocalStorageImpl {
+    pub fn new(
+        wal_path: PathBuf,
+        config: LocalStorageConfig,
+        runtime: Arc<Runtime>,
+    ) -> Result<Self> {
+        let LocalStorageConfig { cache_size, .. } = config.clone();
+        let wal_path_str = wal_path.to_str().unwrap().to_string();
+        let segment_manager = RegionManager::new(wal_path_str.clone(), 
cache_size, runtime.clone())
+            .box_err()
+            .context(Open {
+                wal_path: wal_path_str,
+            })?;
+        Ok(Self {
+            config,
+            _runtime: runtime,
+            segment_manager,
+        })
+    }
+}
+
+impl Drop for LocalStorageImpl {
+    fn drop(&mut self) {
+        info!("LocalStorage dropped, config:{:?}", self.config);
+    }
+}
+
+impl Debug for LocalStorageImpl {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("LocalStorageImpl")
+            .field("config", &self.config)
+            .finish()
+    }
+}
+
+#[async_trait]
+impl WalManager for LocalStorageImpl {
+    async fn sequence_num(&self, location: WalLocation) -> Result<u64> {
+        self.segment_manager
+            .sequence_num(location)
+            .box_err()
+            .context(Read)
+    }
+
+    async fn mark_delete_entries_up_to(
+        &self,
+        location: WalLocation,
+        sequence_num: SequenceNumber,
+    ) -> Result<()> {
+        self.segment_manager
+            .mark_delete_entries_up_to(location, sequence_num)
+            .box_err()
+            .context(Delete)
+    }
+
+    async fn close_region(&self, region_id: RegionId) -> Result<()> {
+        debug!(
+            "Close region for LocalStorage based WAL is noop operation, 
region_id:{}",
+            region_id
+        );
+
+        Ok(())
+    }
+
+    async fn close_gracefully(&self) -> Result<()> {
+        info!("Close local storage wal gracefully");
+        // todo: close all opened files
+        Ok(())
+    }
+
+    async fn read_batch(
+        &self,
+        ctx: &ReadContext,
+        req: &ReadRequest,
+    ) -> Result<BatchLogIteratorAdapter> {
+        self.segment_manager.read(ctx, req).box_err().context(Read)
+    }
+
+    async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> 
Result<SequenceNumber> {
+        self.segment_manager
+            .write(ctx, batch)
+            .box_err()
+            .context(Write)
+    }
+
+    async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> 
Result<BatchLogIteratorAdapter> {
+        self.segment_manager.scan(ctx, req).box_err().context(Read)
+    }
+
+    async fn get_statistics(&self) -> Option<String> {
+        None
+    }
+}
+
+#[derive(Default)]
+pub struct LocalStorageWalsOpener;
+
+impl LocalStorageWalsOpener {
+    fn build_manager(
+        wal_path: PathBuf,
+        runtime: Arc<Runtime>,
+        config: LocalStorageConfig,
+    ) -> Result<WalManagerRef> {
+        Ok(Arc::new(LocalStorageImpl::new(wal_path, config, runtime)?))
+    }
+}
+
+#[async_trait]
+impl WalsOpener for LocalStorageWalsOpener {
+    async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) -> 
Result<OpenedWals> {
+        let local_storage_wal_config = match &config.storage {
+            StorageConfig::Local(config) => config.clone(),
+            _ => {
+                return InvalidWalConfig {
+                    msg: format!(
+                        "invalid wal storage config while opening local 
storage wal, config:{config:?}"
+                    ),
+                }
+                    .fail();
+            }
+        };
+
+        let write_runtime = runtimes.write_runtime.clone();
+        let data_path = Path::new(&local_storage_wal_config.path);
+
+        let data_wal = if config.disable_data {
+            Arc::new(crate::dummy::DoNothing)
+        } else {
+            Self::build_manager(
+                data_path.join(WAL_DIR_NAME),
+                write_runtime.clone(),
+                *local_storage_wal_config.clone(),
+            )?
+        };
+
+        let manifest_wal = Self::build_manager(
+            data_path.join(MANIFEST_DIR_NAME),
+            write_runtime.clone(),
+            *local_storage_wal_config.clone(),
+        )?;
+
+        Ok(OpenedWals {
+            data_wal,
+            manifest_wal,
+        })
+    }
+}
diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs
index 483bfee5..cccde53c 100644
--- a/src/wal/tests/read_write.rs
+++ b/src/wal/tests/read_write.rs
@@ -35,6 +35,7 @@ use tempfile::TempDir;
 use time_ext::ReadableDuration;
 use wal::{
     kv_encoder::LogBatchEncoder,
+    local_storage_impl::{config::LocalStorageConfig, 
wal_manager::LocalStorageImpl},
     log_batch::{LogWriteBatch, MemoryPayload, MemoryPayloadDecoder},
     manager::{
         BatchLogIteratorAdapter, ReadBoundary, ReadContext, ReadRequest, 
ScanRequest, WalLocation,
@@ -70,6 +71,13 @@ fn test_kafka_wal() {
     test_all(builder, true);
 }
 
+#[test]
+#[ignore = "this test cannot pass completely, since delete is not supported 
yet"]
+fn test_local_storage_wal() {
+    let builder = LocalStorageWalBuilder;
+    test_all(builder, false);
+}
+
 fn test_all<B: WalBuilder>(builder: B, is_distributed: bool) {
     test_simple_read_write_default_batch(builder.clone());
     test_simple_read_write_different_batch_size(builder.clone());
@@ -980,6 +988,22 @@ impl Clone for KafkaWalBuilder {
     }
 }
 
+#[derive(Clone, Default)]
+pub struct LocalStorageWalBuilder;
+
+#[async_trait]
+impl WalBuilder for LocalStorageWalBuilder {
+    type Wal = LocalStorageImpl;
+
+    async fn build(&self, data_path: &Path, runtime: Arc<Runtime>) -> 
Arc<Self::Wal> {
+        let config = LocalStorageConfig {
+            path: data_path.to_str().unwrap().to_string(),
+            ..LocalStorageConfig::default()
+        };
+        Arc::new(LocalStorageImpl::new(data_path.to_path_buf(), config, 
runtime).unwrap())
+    }
+}
+
 /// The environment for testing wal.
 pub struct TestEnv<B> {
     pub dir: TempDir,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to