This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 8a288405 feat: Add a new disk-based WAL implementation for standalone
deployment (#1552)
8a288405 is described below
commit 8a2884052b4c02d132fd2c30fa6169b84a9d2ffd
Author: Draco <[email protected]>
AuthorDate: Fri Aug 16 14:18:36 2024 +0800
feat: Add a new disk-based WAL implementation for standalone deployment
(#1552)
## Rationale
#1279
## Detailed Changes
1. Added a struct `Segment` responsible for reading and writing segment
files, and it records the offset of each record.
2. Add a struct SegmentManager responsible for managing all segments,
including:
1. Reading all segments from the folder upon creation.
2. Writing only to the segment with the largest ID.
3. Maintaining a cache where segments not in the cache are closed, while
segments in the cache have their files open and are memory-mapped using
mmap.
3. Implement the `WalManager` trait.
## Test Plan
Unit tests.
---
Cargo.lock | 19 +-
integration_tests/Makefile | 2 +-
src/analytic_engine/Cargo.toml | 1 +
src/horaedb/Cargo.toml | 3 +-
src/horaedb/src/setup.rs | 16 +
src/wal/Cargo.toml | 5 +-
src/wal/src/config.rs | 7 +
src/wal/src/lib.rs | 2 +
.../src/{lib.rs => local_storage_impl/config.rs} | 30 +-
src/wal/src/{lib.rs => local_storage_impl/mod.rs} | 18 +-
src/wal/src/local_storage_impl/record_encoding.rs | 275 +++++++
src/wal/src/local_storage_impl/segment.rs | 900 +++++++++++++++++++++
src/wal/src/local_storage_impl/wal_manager.rs | 195 +++++
src/wal/tests/read_write.rs | 24 +
14 files changed, 1461 insertions(+), 36 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 04edee33..c904497a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1710,9 +1710,9 @@ dependencies = [
[[package]]
name = "crc32fast"
-version = "1.3.2"
+version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
+checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
dependencies = [
"cfg-if 1.0.0",
]
@@ -3932,6 +3932,15 @@ dependencies = [
"libc",
]
+[[package]]
+name = "memmap2"
+version = "0.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "memoffset"
version = "0.6.5"
@@ -6892,7 +6901,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "167a4ffd7c35c143fd1030aa3c2caf76ba42220bd5a6b5f4781896434723b8c3"
dependencies = [
"debugid",
- "memmap2",
+ "memmap2 0.5.10",
"stable_deref_trait",
"uuid",
]
@@ -7729,7 +7738,7 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
- "cfg-if 0.1.10",
+ "cfg-if 1.0.0",
"rand 0.8.5",
"static_assertions",
]
@@ -7884,12 +7893,14 @@ dependencies = [
"chrono",
"codec",
"common_types",
+ "crc32fast",
"futures 0.3.28",
"generic_error",
"horaedbproto 2.0.0",
"lazy_static",
"logger",
"macros",
+ "memmap2 0.9.4",
"message_queue",
"prometheus 0.12.0",
"prost 0.11.8",
diff --git a/integration_tests/Makefile b/integration_tests/Makefile
index e5f94e03..e6ce21bd 100644
--- a/integration_tests/Makefile
+++ b/integration_tests/Makefile
@@ -54,7 +54,7 @@ build-meta:
./build_meta.sh
build-horaedb:
- cd .. && cargo build --bin horaedb-server --features
wal-table-kv,wal-message-queue,wal-rocksdb
+ cd .. && cargo build --bin horaedb-server --features
wal-table-kv,wal-message-queue,wal-rocksdb,wal-local-storage
build-test:
cargo build
diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml
index 1e02406a..8197b4ee 100644
--- a/src/analytic_engine/Cargo.toml
+++ b/src/analytic_engine/Cargo.toml
@@ -35,6 +35,7 @@ test = ["tempfile"]
wal-table-kv = ["wal/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue"]
wal-rocksdb = ["wal/wal-rocksdb"]
+wal-local-storage = ["wal/wal-local-storage"]
[dependencies]
# In alphabetical order
diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml
index 38f22464..2abfa49e 100644
--- a/src/horaedb/Cargo.toml
+++ b/src/horaedb/Cargo.toml
@@ -31,10 +31,11 @@ workspace = true
workspace = true
[features]
-default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"]
+default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue",
"wal-local-storage"]
wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue",
"analytic_engine/wal-message-queue"]
wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"]
+wal-local-storage = ["wal/wal-local-storage",
"analytic_engine/wal-local-storage"]
[dependencies]
analytic_engine = { workspace = true }
diff --git a/src/horaedb/src/setup.rs b/src/horaedb/src/setup.rs
index 1cc0e2ab..9bdb46da 100644
--- a/src/horaedb/src/setup.rs
+++ b/src/horaedb/src/setup.rs
@@ -188,6 +188,22 @@ pub fn run_server(config: Config, log_runtime:
RuntimeLevel) {
panic!("Message Queue WAL not bundled!");
}
}
+ StorageConfig::Local(_) => {
+ #[cfg(feature = "wal-local-storage")]
+ {
+ use
wal::local_storage_impl::wal_manager::LocalStorageWalsOpener;
+ run_server_with_runtimes::<LocalStorageWalsOpener>(
+ config,
+ engine_runtimes,
+ log_runtime,
+ )
+ .await;
+ }
+ #[cfg(not(feature = "wal-local-storage"))]
+ {
+ panic!("Local Storage WAL not bundled!");
+ }
+ }
}
});
}
diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml
index 6c8be8c1..0d13ef36 100644
--- a/src/wal/Cargo.toml
+++ b/src/wal/Cargo.toml
@@ -40,10 +40,11 @@ optional = true
wal-message-queue = ["dep:message_queue"]
wal-table-kv = ["dep:table_kv"]
wal-rocksdb = ["dep:rocksdb"]
+wal-local-storage = ["memmap2", "crc32fast"]
[[test]]
name = "read_write"
-required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb"]
+required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb",
"wal-local-storage"]
[dependencies]
async-trait = { workspace = true }
@@ -51,12 +52,14 @@ bytes_ext = { workspace = true }
chrono = { workspace = true }
codec = { workspace = true }
common_types = { workspace = true }
+crc32fast = { version = "1.4.2", optional = true }
futures = { workspace = true, features = ["async-await"], optional = true }
generic_error = { workspace = true }
horaedbproto = { workspace = true }
lazy_static = { workspace = true }
logger = { workspace = true }
macros = { workspace = true }
+memmap2 = { version = "0.9.4", optional = true }
message_queue = { workspace = true, optional = true }
prometheus = { workspace = true }
prost = { workspace = true }
diff --git a/src/wal/src/config.rs b/src/wal/src/config.rs
index 8a13e00c..3169a7cb 100644
--- a/src/wal/src/config.rs
+++ b/src/wal/src/config.rs
@@ -35,6 +35,12 @@ pub type KafkaStorageConfig =
crate::message_queue_impl::config::KafkaStorageCon
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct KafkaStorageConfig;
+#[cfg(feature = "wal-local-storage")]
+pub type LocalStorageConfig =
crate::local_storage_impl::config::LocalStorageConfig;
+#[cfg(not(feature = "wal-local-storage"))]
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct LocalStorageConfig;
+
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
// The flatten attribute inlines keys from a field into the parent struct.
@@ -63,4 +69,5 @@ pub enum StorageConfig {
RocksDB(Box<RocksDBStorageConfig>),
Obkv(Box<ObkvStorageConfig>),
Kafka(Box<KafkaStorageConfig>),
+ Local(Box<LocalStorageConfig>),
}
diff --git a/src/wal/src/lib.rs b/src/wal/src/lib.rs
index a03bdab1..c5731178 100644
--- a/src/wal/src/lib.rs
+++ b/src/wal/src/lib.rs
@@ -22,6 +22,8 @@
pub mod config;
mod dummy;
pub mod kv_encoder;
+#[cfg(feature = "wal-local-storage")]
+pub mod local_storage_impl;
pub mod log_batch;
pub mod manager;
#[cfg(feature = "wal-message-queue")]
diff --git a/src/wal/src/lib.rs b/src/wal/src/local_storage_impl/config.rs
similarity index 65%
copy from src/wal/src/lib.rs
copy to src/wal/src/local_storage_impl/config.rs
index a03bdab1..59a0c81b 100644
--- a/src/wal/src/lib.rs
+++ b/src/wal/src/local_storage_impl/config.rs
@@ -15,19 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-//! Write Ahead Log
+use serde::{Deserialize, Serialize};
-#![feature(trait_alias)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalStorageConfig {
+ pub path: String,
+ pub max_segment_size: u64,
+ pub cache_size: usize,
+}
-pub mod config;
-mod dummy;
-pub mod kv_encoder;
-pub mod log_batch;
-pub mod manager;
-#[cfg(feature = "wal-message-queue")]
-pub mod message_queue_impl;
-pub(crate) mod metrics;
-#[cfg(feature = "wal-rocksdb")]
-pub mod rocksdb_impl;
-#[cfg(feature = "wal-table-kv")]
-pub mod table_kv_impl;
+impl Default for LocalStorageConfig {
+ fn default() -> Self {
+ Self {
+ path: "/tmp/horaedb".to_string(),
+ max_segment_size: 64 * 1024 * 1024, // 64MB
+ cache_size: 3,
+ }
+ }
+}
diff --git a/src/wal/src/lib.rs b/src/wal/src/local_storage_impl/mod.rs
similarity index 72%
copy from src/wal/src/lib.rs
copy to src/wal/src/local_storage_impl/mod.rs
index a03bdab1..7bb5bf63 100644
--- a/src/wal/src/lib.rs
+++ b/src/wal/src/local_storage_impl/mod.rs
@@ -15,19 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Write Ahead Log
-
-#![feature(trait_alias)]
-
pub mod config;
-mod dummy;
-pub mod kv_encoder;
-pub mod log_batch;
-pub mod manager;
-#[cfg(feature = "wal-message-queue")]
-pub mod message_queue_impl;
-pub(crate) mod metrics;
-#[cfg(feature = "wal-rocksdb")]
-pub mod rocksdb_impl;
-#[cfg(feature = "wal-table-kv")]
-pub mod table_kv_impl;
+mod record_encoding;
+mod segment;
+pub mod wal_manager;
diff --git a/src/wal/src/local_storage_impl/record_encoding.rs
b/src/wal/src/local_storage_impl/record_encoding.rs
new file mode 100644
index 00000000..f7c55b29
--- /dev/null
+++ b/src/wal/src/local_storage_impl/record_encoding.rs
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut};
+use codec::Encoder;
+use generic_error::GenericError;
+use macros::define_result;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+pub const RECORD_ENCODING_V0: u8 = 0;
+pub const NEWEST_RECORD_ENCODING_VERSION: u8 = RECORD_ENCODING_V0;
+
+pub const VERSION_SIZE: usize = 1;
+pub const CRC_SIZE: usize = 4;
+pub const RECORD_LENGTH_SIZE: usize = 4;
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("Version mismatch, expect:{}, given:{}", expected, given))]
+ Version { expected: u8, given: u8 },
+
+ #[snafu(display("Failed to encode, err:{}", source))]
+ Encoding { source: bytes_ext::Error },
+
+ #[snafu(display("Failed to decode, err:{}", source))]
+ Decoding { source: bytes_ext::Error },
+
+ #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+ InvalidRecord {
+ source: GenericError,
+ backtrace: Backtrace,
+ },
+
+ #[snafu(display("Length mismatch: expected {} but found {}", expected,
actual))]
+ LengthMismatch { expected: usize, actual: usize },
+
+ #[snafu(display("Checksum mismatch: expected {}, but got {}", expected,
actual))]
+ ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+/// Record format:
+///
+/// ```text
+///
+---------+--------+--------+------------+--------------+--------------+-------+
+/// | version | crc | length | table id | sequence num | value length |
value |
+/// | (u8) | (u32) | (u32) | (u64) | (u64) | (u32) |
|
+///
+---------+--------+--------+------------+--------------+--------------+-------+
+/// ```
+pub struct Record<'a> {
+ /// The version number of the record.
+ pub version: u8,
+
+ /// The CRC checksum of the record.
+ pub crc: u32,
+
+ /// The length of the record (excluding version, crc and length).
+ pub length: u32,
+
+ /// Identifier for tables.
+ pub table_id: u64,
+
+ /// Identifier for records, incrementing.
+ pub sequence_num: u64,
+
+ /// The length of the value in bytes.
+ pub value_length: u32,
+
+ /// Common log value.
+ pub value: &'a [u8],
+}
+
+impl<'a> Record<'a> {
+ pub fn new(table_id: u64, sequence_num: u64, value: &'a [u8]) ->
Result<Self> {
+ let mut record = Record {
+ version: NEWEST_RECORD_ENCODING_VERSION,
+ crc: 0,
+ length: (8 + 8 + 4 + value.len()) as u32,
+ table_id,
+ sequence_num,
+ value_length: value.len() as u32,
+ value,
+ };
+
+ // Calculate CRC
+ let mut buf = Vec::new();
+ buf.try_put_u64(table_id).context(Encoding)?;
+ buf.try_put_u64(sequence_num).context(Encoding)?;
+ buf.try_put_u32(record.value_length).context(Encoding)?;
+ buf.extend_from_slice(value);
+ record.crc = crc32fast::hash(&buf);
+
+ Ok(record)
+ }
+
+ // Return the length of the record
+ pub fn len(&self) -> usize {
+ VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE + self.length as usize
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct RecordEncoding {
+ version: u8,
+}
+
+impl RecordEncoding {
+ pub fn newest() -> Self {
+ Self {
+ version: NEWEST_RECORD_ENCODING_VERSION,
+ }
+ }
+}
+
+impl Encoder<Record<'_>> for RecordEncoding {
+ type Error = Error;
+
+ fn encode<B: BufMut>(&self, buf: &mut B, record: &Record) -> Result<()> {
+ // Verify version
+ ensure!(
+ record.version == self.version,
+ Version {
+ expected: self.version,
+ given: record.version
+ }
+ );
+
+ buf.try_put_u8(record.version).context(Encoding)?;
+ buf.try_put_u32(record.crc).context(Encoding)?;
+ buf.try_put_u32(record.length).context(Encoding)?;
+ buf.try_put_u64(record.table_id).context(Encoding)?;
+ buf.try_put_u64(record.sequence_num).context(Encoding)?;
+ buf.try_put_u32(record.value_length).context(Encoding)?;
+ buf.try_put(record.value).context(Encoding)?;
+ Ok(())
+ }
+
+ fn estimate_encoded_size(&self, record: &Record) -> usize {
+ record.len()
+ }
+}
+
+impl RecordEncoding {
+ pub fn decode<'a>(&'a self, mut buf: &'a [u8]) -> Result<Record> {
+ // Ensure that buf is not shorter than the shortest record.
+ ensure!(
+ buf.remaining() >= VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+ LengthMismatch {
+ expected: VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+ actual: buf.remaining()
+ }
+ );
+
+ // Read version
+ let version = buf.try_get_u8().context(Decoding)?;
+
+ // Verify version
+ ensure!(
+ version == self.version,
+ Version {
+ expected: self.version,
+ given: version
+ }
+ );
+
+ // Read CRC
+ let crc = buf.try_get_u32().context(Decoding)?;
+
+ // Read length
+ let length = buf.try_get_u32().context(Decoding)?;
+
+ // Ensure the buf is long enough
+ ensure!(
+ buf.remaining() >= length as usize,
+ LengthMismatch {
+ expected: length as usize,
+ actual: buf.remaining()
+ }
+ );
+
+ // Verify CRC
+ let data = &buf[0..length as usize];
+ let computed_crc = crc32fast::hash(data);
+ ensure!(
+ computed_crc == crc,
+ ChecksumMismatch {
+ expected: crc,
+ actual: computed_crc
+ }
+ );
+
+ // Read table id
+ let table_id = buf.try_get_u64().context(Decoding)?;
+
+ // Read sequence number
+ let sequence_num = buf.try_get_u64().context(Decoding)?;
+
+ // Read value length
+ let value_length = buf.try_get_u32().context(Decoding)?;
+
+ // Read value
+ let value = &buf[0..value_length as usize];
+ buf.advance(value_length as usize);
+
+ Ok(Record {
+ version,
+ crc,
+ length,
+ table_id,
+ sequence_num,
+ value_length,
+ value,
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use bytes_ext::BytesMut;
+ use codec::Encoder;
+
+ use crate::local_storage_impl::record_encoding::{Record, RecordEncoding};
+
+ #[test]
+ fn test_record_encoding() {
+ let table_id = 1;
+ let sequence_num = 2;
+ let value = b"test_value";
+ let record = Record::new(table_id, sequence_num, value).unwrap();
+
+ let encoder = RecordEncoding::newest();
+ let mut buf = BytesMut::new();
+ encoder.encode(&mut buf, &record).unwrap();
+
+ let expected_len = record.len();
+ assert_eq!(buf.len(), expected_len);
+ }
+
+ #[test]
+ fn test_record_decoding() {
+ let table_id = 1;
+ let sequence_num = 2;
+ let value = b"test_value";
+ let record = Record::new(table_id, sequence_num, value).unwrap();
+
+ let encoder = RecordEncoding::newest();
+ let mut buf = BytesMut::new();
+ encoder.encode(&mut buf, &record).unwrap();
+
+ let decoded_record = encoder.decode(&buf).unwrap();
+
+ assert_eq!(decoded_record.version, record.version);
+ assert_eq!(decoded_record.crc, record.crc);
+ assert_eq!(decoded_record.length, record.length);
+ assert_eq!(decoded_record.table_id, record.table_id);
+ assert_eq!(decoded_record.sequence_num, record.sequence_num);
+ assert_eq!(decoded_record.value_length, record.value_length);
+ assert_eq!(decoded_record.value, record.value);
+ }
+}
diff --git a/src/wal/src/local_storage_impl/segment.rs
b/src/wal/src/local_storage_impl/segment.rs
new file mode 100644
index 00000000..f44ea131
--- /dev/null
+++ b/src/wal/src/local_storage_impl/segment.rs
@@ -0,0 +1,900 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::{HashMap, VecDeque},
+ fmt::Debug,
+ fs,
+ fs::{File, OpenOptions},
+ io,
+ io::Write,
+ path::Path,
+ sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc, Mutex,
+ },
+};
+
+use codec::Encoder;
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER,
MIN_SEQUENCE_NUMBER};
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+ kv_encoder::CommonLogEncoding,
+ local_storage_impl::record_encoding::{Record, RecordEncoding},
+ log_batch::{LogEntry, LogWriteBatch},
+ manager::{
+ BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId,
ScanContext,
+ ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+ },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("Failed to open or create file: {}", source))]
+ FileOpen { source: io::Error },
+
+ #[snafu(display("Failed to open or create dir: {}", source))]
+ DirOpen { source: io::Error },
+
+ #[snafu(display("Failed to map file to memory: {}", source))]
+ Mmap { source: io::Error },
+
+ #[snafu(display("Segment full"))]
+ SegmentFull,
+
+ #[snafu(display("Failed to append data to segment file: {}", source))]
+ SegmentAppend { source: io::Error },
+
+ #[snafu(display("Failed to flush mmap: {}", source))]
+ Flush { source: io::Error },
+
+ #[snafu(display(
+ "Attempted to read beyond segment size. Offset: {}, Size: {},
FileSize: {}",
+ offset,
+ size,
+ file_size
+ ))]
+ ReadOutOfBounds {
+ offset: u64,
+ size: u64,
+ file_size: u64,
+ },
+
+ #[snafu(display("Invalid segment header"))]
+ InvalidHeader,
+
+ #[snafu(display("Segment not open, id:{}", id))]
+ SegmentNotOpen { id: u64 },
+
+ #[snafu(display("Segment not found, id:{}", id))]
+ SegmentNotFound { id: u64 },
+
+ #[snafu(display("Unable to convert slice: {}", source))]
+ Conversion {
+ source: std::array::TryFromSliceError,
+ },
+
+ #[snafu(display("{}", source))]
+ Encoding { source: GenericError },
+
+ #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+ InvalidRecord {
+ source: GenericError,
+ backtrace: Backtrace,
+ },
+
+ #[snafu(display("Length mismatch: expected {} but found {}", expected,
actual))]
+ LengthMismatch { expected: usize, actual: usize },
+
+ #[snafu(display("Checksum mismatch: expected {}, but got {}", expected,
actual))]
+ ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const SEGMENT_HEADER: &[u8] = b"HoraeDBWAL";
+const WAL_SEGMENT_V0: u8 = 0;
+const NEWEST_WAL_SEGMENT_VERSION: u8 = WAL_SEGMENT_V0;
+const VERSION_SIZE: usize = 1;
+
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+/// Segment file format:
+///
+/// ```text
+/// +-------------+--------+--------+--------+---+--------+
+/// | Version(u8) | Header | Record | Record |...| Record |
+/// +-------------+--------+--------+--------+---+--------+
+/// ```
+///
+/// The `Header` is a fixed string. The format of the `Record` can be
referenced
+/// in the struct `Record`.
+#[derive(Debug)]
+pub struct Segment {
+ /// The version of the segment.
+ version: u8,
+
+ /// The file path of the segment.
+ path: String,
+
+ /// A unique identifier for the segment.
+ id: u64,
+
+ /// The size of the segment in bytes.
+ size: u64,
+
+ /// The minimum sequence number of records within this segment.
+ min_seq: SequenceNumber,
+
+ /// The maximum sequence number of records within this segment.
+ max_seq: SequenceNumber,
+
+ /// The encoding format used for records within this segment.
+ record_encoding: RecordEncoding,
+
+ /// An optional file handle for the segment.
+ /// This may be `None` if the file is not currently open.
+ file: Option<File>,
+
+ /// An optional memory-mapped mutable buffer of the segment.
+ /// This may be `None` if the segment is not memory-mapped.
+ mmap: Option<MmapMut>,
+
+ /// An optional vector of positions within the segment.
+ /// This may be `None` if the segment is not memory-mapped.
+ record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+ start: u64,
+ end: u64,
+}
+
+impl Segment {
+ pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+ if !Path::new(&path).exists() {
+ let mut file = File::create(&path).context(FileOpen)?;
+ file.write_all(&[NEWEST_WAL_SEGMENT_VERSION])
+ .context(FileOpen)?;
+ file.write_all(SEGMENT_HEADER).context(FileOpen)?;
+ }
+ Ok(Segment {
+ version: NEWEST_WAL_SEGMENT_VERSION,
+ path,
+ id: segment_id,
+ size: SEGMENT_HEADER.len() as u64,
+ min_seq: MAX_SEQUENCE_NUMBER,
+ max_seq: MIN_SEQUENCE_NUMBER,
+ record_encoding: RecordEncoding::newest(),
+ file: None,
+ mmap: None,
+ record_position: None,
+ })
+ }
+
+ pub fn open(&mut self) -> Result<()> {
+ // Open the segment file
+ let file = OpenOptions::new()
+ .read(true)
+ .append(true)
+ .open(&self.path)
+ .context(FileOpen)?;
+
+ let metadata = file.metadata().context(FileOpen)?;
+ let size = metadata.len();
+
+ // Map the file in memory
+ let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+ // Validate segment version
+ let version = mmap[0];
+ ensure!(version == self.version, InvalidHeader);
+
+ // Validate segment header
+ let header_len = SEGMENT_HEADER.len();
+ ensure!(size >= header_len as u64, InvalidHeader);
+ let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len];
+ ensure!(header == SEGMENT_HEADER, InvalidHeader);
+
+ // Read and validate all records
+ let mut pos = VERSION_SIZE + header_len;
+ let mut record_position = Vec::new();
+ while pos < size as usize {
+ let data = &mmap[pos..];
+
+ let record = self
+ .record_encoding
+ .decode(data)
+ .box_err()
+ .context(InvalidRecord)?;
+
+ record_position.push(Position {
+ start: pos as u64,
+ end: (pos + record.len()) as u64,
+ });
+
+ // Move to the next record
+ pos += record.len();
+ }
+
+ self.file = Some(file);
+ self.mmap = Some(mmap);
+ self.record_position = Some(record_position);
+ self.size = size;
+ Ok(())
+ }
+
+ pub fn close(&mut self) -> Result<()> {
+ self.file.take();
+ self.mmap.take();
+ self.record_position.take();
+ Ok(())
+ }
+
+ /// Append a slice to the segment file.
+ pub fn append(&mut self, data: &[u8]) -> Result<()> {
+ ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull);
+
+ // Ensure the segment file is open
+ let Some(file) = &mut self.file else {
+ return SegmentNotOpen { id: self.id }.fail();
+ };
+
+ // Append to the file
+ file.write_all(data).context(SegmentAppend)?;
+ file.flush().context(Flush)?;
+
+ // Remap
+ // todo: Do not remap every time you append; instead, create a large
enough file
+ // at the beginning.
+ let mmap = unsafe { MmapOptions::new().map_mut(&*file).context(Mmap)?
};
+ self.mmap = Some(mmap);
+ self.size += data.len() as u64;
+
+ Ok(())
+ }
+
+ pub fn read(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
+ // Ensure that the reading range is within the file
+ ensure!(
+ offset + size <= self.size,
+ LengthMismatch {
+ expected: (offset + size) as usize,
+ actual: self.size as usize
+ }
+ );
+
+ let start = offset as usize;
+ let end = start + size as usize;
+ match &self.mmap {
+ Some(mmap) => Ok(mmap[start..end].to_vec()),
+ None => SegmentNotOpen { id: self.id }.fail(),
+ }
+ }
+
+ pub fn append_record_position(&mut self, pos: &mut Vec<Position>) ->
Result<()> {
+ match self.record_position.as_mut() {
+ Some(record_position) => {
+ record_position.append(pos);
+ Ok(())
+ }
+ None => SegmentNotOpen { id: self.id }.fail(),
+ }
+ }
+
+ pub fn update_seq(&mut self, min_seq: u64, max_seq: u64) -> Result<()> {
+ if min_seq < self.min_seq {
+ self.min_seq = min_seq;
+ }
+ if max_seq > self.max_seq {
+ self.max_seq = max_seq;
+ }
+ Ok(())
+ }
+}
+
+pub struct Region {
+ /// Identifier for regions.
+ _region_id: u64,
+
+ /// All segments protected by a mutex
+ /// todo: maybe use a RWLock?
+ all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
+
+ /// Cache for opened segments
+ cache: Mutex<VecDeque<u64>>,
+
+ /// Maximum size of the cache
+ cache_size: usize,
+
+ /// Directory for segment storage
+ _segment_dir: String,
+
+ /// Index of the latest segment for appending logs
+ current: Mutex<u64>,
+
+ /// Encoding method for logs
+ log_encoding: CommonLogEncoding,
+
+ /// Encoding method for records
+ record_encoding: RecordEncoding,
+
+ /// Sequence number for the next log
+ next_sequence_num: AtomicU64,
+
+ /// Runtime for handling write requests
+ runtime: Arc<Runtime>,
+}
+
+impl Region {
+ pub fn new(
+ region_id: u64,
+ cache_size: usize,
+ segment_dir: String,
+ runtime: Arc<Runtime>,
+ ) -> Result<Self> {
+ let mut all_segments = HashMap::new();
+
+ // Scan the directory for existing WAL files
+ let mut max_segment_id: i32 = -1;
+
+ // Segment file naming convention: segment_<id>.wal
+ for entry in fs::read_dir(&segment_dir).context(FileOpen)? {
+ let entry = entry.context(FileOpen)?;
+
+ let path = entry.path();
+
+ if !path.is_file() {
+ continue;
+ }
+
+ match path.extension() {
+ Some(ext) if ext == "wal" => ext,
+ _ => continue,
+ };
+
+ let file_name = match path.file_name().and_then(|name|
name.to_str()) {
+ Some(name) => name,
+ None => continue,
+ };
+
+ let segment_id = match file_name
+ .trim_start_matches("segment_")
+ .trim_end_matches(".wal")
+ .parse::<u64>()
+ .ok()
+ {
+ Some(id) => id,
+ None => continue,
+ };
+
+ let segment = Segment::new(path.to_string_lossy().to_string(),
segment_id)?;
+ let segment = Arc::new(Mutex::new(segment));
+
+ if segment_id as i32 > max_segment_id {
+ max_segment_id = segment_id as i32;
+ }
+ all_segments.insert(segment_id, segment);
+ }
+
+ // If no existing segments, create a new one
+ if max_segment_id == -1 {
+ max_segment_id = 0;
+ let path = format!("{}/segment_{}.wal", segment_dir,
max_segment_id);
+ let new_segment = Segment::new(path, max_segment_id as u64)?;
+ let new_segment = Arc::new(Mutex::new(new_segment));
+ all_segments.insert(0, new_segment);
+ }
+
+ Ok(Self {
+ _region_id: region_id,
+ all_segments: Mutex::new(all_segments),
+ cache: Mutex::new(VecDeque::new()),
+ cache_size,
+ _segment_dir: segment_dir,
+ current: Mutex::new(max_segment_id as u64),
+ log_encoding: CommonLogEncoding::newest(),
+ record_encoding: RecordEncoding::newest(),
+ // todo: do not use MIN_SEQUENCE_NUMBER, read from the latest
record
+ next_sequence_num: AtomicU64::new(MIN_SEQUENCE_NUMBER + 1),
+ runtime,
+ })
+ }
+
+ /// Obtain the target segment. If it is not open, then open it and put it
to
+ /// the cache.
+ fn get_segment(&self, segment_id: u64) -> Result<Arc<Mutex<Segment>>> {
+ let mut cache = self.cache.lock().unwrap();
+ let all_segments = self.all_segments.lock().unwrap();
+
+ let segment = all_segments.get(&segment_id);
+
+ let segment = match segment {
+ Some(segment) => segment,
+ None => return SegmentNotFound { id: segment_id }.fail(),
+ };
+
+ // Check if segment is already in cache
+ if cache.iter().any(|id| *id == segment_id) {
+ let segment = all_segments.get(&segment_id);
+ return match segment {
+ Some(segment) => Ok(segment.clone()),
+ None => SegmentNotFound { id: segment_id }.fail(),
+ };
+ }
+
+ // If not in cache, load from disk
+ segment.lock().unwrap().open()?;
+
+ // Add to cache
+ if cache.len() == self.cache_size {
+ let evicted_segment_id = cache.pop_front();
+ // TODO: if the evicted segment is being read or written, wait for
it to finish
+ if let Some(evicted_segment_id) = evicted_segment_id {
+ let evicted_segment = all_segments.get(&evicted_segment_id);
+ if let Some(evicted_segment) = evicted_segment {
+ evicted_segment.lock().unwrap().close()?;
+ } else {
+ return SegmentNotFound {
+ id: evicted_segment_id,
+ }
+ .fail();
+ }
+ }
+ }
+ cache.push_back(segment_id);
+
+ Ok(segment.clone())
+ }
+
+ pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) ->
Result<SequenceNumber> {
+ // Lock
+ let current = self.current.lock().unwrap();
+ let segment = self.get_segment(*current)?;
+ let mut segment = segment.lock().unwrap();
+
+ let entries_num = batch.len() as u64;
+ let table_id = batch.location.table_id;
+
+ // Allocate sequence number
+ let prev_sequence_num = self.alloc_sequence_num(entries_num);
+ let mut next_sequence_num = prev_sequence_num;
+
+ let mut data = Vec::new();
+ let mut record_position = Vec::new();
+
+ for entry in &batch.entries {
+ // Encode the record
+ let record = Record::new(table_id, next_sequence_num,
&entry.payload)
+ .box_err()
+ .context(Encoding)?;
+ self.record_encoding
+ .encode(&mut data, &record)
+ .box_err()
+ .context(Encoding)?;
+
+ record_position.push(Position {
+ start: (data.len() - record.len()) as u64,
+ end: data.len() as u64,
+ });
+
+ next_sequence_num += 1;
+ }
+
+ // TODO: spawn a new task to write to segment
+ // TODO: maybe need a write mutex?
+
+ for pos in record_position.iter_mut() {
+ pos.start += segment.size;
+ pos.end += segment.size;
+ }
+
+ // Update the record position
+ segment.append_record_position(&mut record_position)?;
+
+ // Update the min and max sequence numbers
+ segment.update_seq(prev_sequence_num, next_sequence_num - 1)?;
+
+ // Append logs to segment file
+ segment.append(&data)?;
+ Ok(next_sequence_num - 1)
+ }
+
+ pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) ->
Result<BatchLogIteratorAdapter> {
+ // Check read range's validity.
+ let start = if let Some(start) = req.start.as_start_sequence_number() {
+ start
+ } else {
+ MAX_SEQUENCE_NUMBER
+ };
+ let end = if let Some(end) = req.end.as_end_sequence_number() {
+ end
+ } else {
+ MIN_SEQUENCE_NUMBER
+ };
+ if start > end {
+ return Ok(BatchLogIteratorAdapter::empty());
+ }
+ let iter = SegmentLogIterator::new(
+ self.log_encoding.clone(),
+ self.record_encoding.clone(),
+ self.get_segment(0)?,
+ Some(req.location.table_id),
+ start,
+ end,
+ );
+ Ok(BatchLogIteratorAdapter::new_with_sync(
+ Box::new(iter),
+ self.runtime.clone(),
+ ctx.batch_size,
+ ))
+ }
+
+ pub fn scan(&self, ctx: &ScanContext, _req: &ScanRequest) ->
Result<BatchLogIteratorAdapter> {
+ let iter = SegmentLogIterator::new(
+ self.log_encoding.clone(),
+ self.record_encoding.clone(),
+ self.get_segment(0)?,
+ None,
+ MIN_SEQUENCE_NUMBER,
+ MAX_SEQUENCE_NUMBER,
+ );
+ Ok(BatchLogIteratorAdapter::new_with_sync(
+ Box::new(iter),
+ self.runtime.clone(),
+ ctx.batch_size,
+ ))
+ }
+
+ pub fn mark_delete_entries_up_to(
+ &self,
+ _location: WalLocation,
+ _sequence_num: SequenceNumber,
+ ) -> Result<()> {
+ todo!()
+ }
+
+ #[inline]
+ fn alloc_sequence_num(&self, number: u64) -> SequenceNumber {
+ self.next_sequence_num.fetch_add(number, Ordering::Relaxed)
+ }
+
+ pub fn sequence_num(&self, _location: WalLocation) ->
Result<SequenceNumber> {
+ let next_seq_num = self.next_sequence_num.load(Ordering::Relaxed);
+ debug_assert!(next_seq_num > 0);
+ Ok(next_seq_num - 1)
+ }
+}
+
+pub struct RegionManager {
+ root_dir: String,
+ regions: Mutex<HashMap<u64, Arc<Region>>>,
+ cache_size: usize,
+ runtime: Arc<Runtime>,
+}
+
+impl RegionManager {
+ // Create a RegionManager, and scans all the region folders located under
+ // root_dir.
+ pub fn new(root_dir: String, cache_size: usize, runtime: Arc<Runtime>) ->
Result<Self> {
+ let mut regions = HashMap::new();
+
+ // Naming conversion: <root_dir>/<region_id>
+ for entry in fs::read_dir(&root_dir).context(DirOpen)? {
+ let entry = entry.context(DirOpen)?;
+
+ let path = entry.path();
+ if path.is_file() {
+ continue;
+ }
+
+ let dir_name = match path.file_name().and_then(|name|
name.to_str()) {
+ Some(name) => name,
+ None => continue,
+ };
+
+ // Parse region id from dir name
+ let region_id = match dir_name.parse::<u64>().ok() {
+ Some(id) => id,
+ None => continue,
+ };
+
+ let region = Region::new(
+ region_id,
+ cache_size,
+ path.to_string_lossy().to_string(),
+ runtime.clone(),
+ )?;
+ regions.insert(region_id, Arc::new(region));
+ }
+
+ Ok(Self {
+ root_dir,
+ regions: Mutex::new(regions),
+ cache_size,
+ runtime,
+ })
+ }
+
+ /// Retrieve a region by its `region_id`. If the region does not exist,
+ /// create a new one.
+ fn get_region(&self, region_id: RegionId) -> Result<Arc<Region>> {
+ let mut regions = self.regions.lock().unwrap();
+ if let Some(region) = regions.get(®ion_id) {
+ return Ok(region.clone());
+ }
+
+ let region_dir = Path::new(&self.root_dir).join(region_id.to_string());
+ fs::create_dir_all(®ion_dir).context(DirOpen)?;
+
+ let region = Region::new(
+ region_id,
+ self.cache_size,
+ region_dir.to_string_lossy().to_string(),
+ self.runtime.clone(),
+ )?;
+
+ Ok(regions.entry(region_id).or_insert(Arc::new(region)).clone())
+ }
+
+ pub fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) ->
Result<SequenceNumber> {
+ let region = self.get_region(batch.location.region_id)?;
+ region.write(ctx, batch)
+ }
+
+ pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) ->
Result<BatchLogIteratorAdapter> {
+ let region = self.get_region(req.location.region_id)?;
+ region.read(ctx, req)
+ }
+
+ pub fn scan(&self, ctx: &ScanContext, req: &ScanRequest) ->
Result<BatchLogIteratorAdapter> {
+ let region = self.get_region(req.region_id)?;
+ region.scan(ctx, req)
+ }
+
+ pub fn mark_delete_entries_up_to(
+ &self,
+ location: WalLocation,
+ sequence_num: SequenceNumber,
+ ) -> Result<()> {
+ let region = self.get_region(location.region_id)?;
+ region.mark_delete_entries_up_to(location, sequence_num)
+ }
+
+ pub fn sequence_num(&self, location: WalLocation) ->
Result<SequenceNumber> {
+ let region = self.get_region(location.region_id)?;
+ region.sequence_num(location)
+ }
+}
+
+// TODO: handle the case when read requests involving multiple segments
+#[derive(Debug)]
+pub struct SegmentLogIterator {
+ /// Encoding method for common log.
+ log_encoding: CommonLogEncoding,
+
+ /// Encoding method for records.
+ record_encoding: RecordEncoding,
+
+ /// Thread-safe, shared reference to the log segment.
+ segment: Arc<Mutex<Segment>>,
+
+ /// Optional identifier for the table, which is used to filter logs.
+ table_id: Option<TableId>,
+
+ /// Starting sequence number for log iteration.
+ start: SequenceNumber,
+
+ /// Ending sequence number for log iteration.
+ end: SequenceNumber,
+
+ /// Index of the current record within the segment.
+ current_record_idx: usize,
+
+ /// The raw payload data of the current record.
+ current_payload: Vec<u8>,
+
+ /// Flag indicating whether there is no more data to read.
+ no_more_data: bool,
+}
+
+impl SegmentLogIterator {
+ pub fn new(
+ log_encoding: CommonLogEncoding,
+ record_encoding: RecordEncoding,
+ segment: Arc<Mutex<Segment>>,
+ table_id: Option<TableId>,
+ start: SequenceNumber,
+ end: SequenceNumber,
+ ) -> Self {
+ SegmentLogIterator {
+ log_encoding,
+ record_encoding,
+ segment,
+ table_id,
+ start,
+ end,
+ current_record_idx: 0,
+ current_payload: Vec::new(),
+ no_more_data: false,
+ }
+ }
+
+ fn next(&mut self) -> Result<Option<LogEntry<&'_ [u8]>>> {
+ if self.no_more_data {
+ return Ok(None);
+ }
+
+ // todo: ensure that this segment is not evicted from the cache during
the read
+ // process, or that it is reloaded into the cache as needed
+ let segment = self.segment.lock().unwrap();
+
+ loop {
+ let Some(record_position) = &segment.record_position else {
+ self.no_more_data = true;
+ return Ok(None);
+ };
+ let Some(pos) = record_position.get(self.current_record_idx) else {
+ self.no_more_data = true;
+ return Ok(None);
+ };
+
+ self.current_record_idx += 1;
+ let record = segment.read(pos.start, pos.end - pos.start)?;
+
+ // Decode record
+ let record = self
+ .record_encoding
+ .decode(record.as_slice())
+ .box_err()
+ .context(InvalidRecord)?;
+
+ // Filter by sequence number
+ if record.sequence_num < self.start {
+ continue;
+ }
+ if record.sequence_num > self.end {
+ self.no_more_data = true;
+ return Ok(None);
+ }
+
+ // Filter by table_id
+ if let Some(table_id) = self.table_id {
+ if record.table_id != table_id {
+ continue;
+ }
+ }
+
+ // Decode value
+ let value = self
+ .log_encoding
+ .decode_value(record.value)
+ .box_err()
+ .context(InvalidRecord)?;
+
+ self.current_payload = value.to_owned();
+
+ return Ok(Some(LogEntry {
+ table_id: record.table_id,
+ sequence: record.sequence_num,
+ payload: self.current_payload.as_slice(),
+ }));
+ }
+ }
+}
+
+impl SyncLogIterator for SegmentLogIterator {
+ fn next_log_entry(&mut self) -> crate::manager::Result<Option<LogEntry<&'_
[u8]>>> {
+ self.next().box_err().context(Read)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use runtime::Builder;
+ use tempfile::tempdir;
+
+ use super::*;
+
+ #[test]
+ fn test_segment_creation() {
+ let dir = tempdir().unwrap();
+ let path = dir
+ .path()
+ .join("segment_0.wal")
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ let segment = Segment::new(path.clone(), 0);
+ assert!(segment.is_ok());
+
+ let segment = segment.unwrap();
+ assert_eq!(segment.version, NEWEST_WAL_SEGMENT_VERSION);
+ assert_eq!(segment.path, path);
+ assert_eq!(segment.id, 0);
+ assert_eq!(segment.size, SEGMENT_HEADER.len() as u64);
+
+ let segment_content = fs::read(path).unwrap();
+ assert_eq!(segment_content[0], NEWEST_WAL_SEGMENT_VERSION);
+ assert_eq!(
+ &segment_content[VERSION_SIZE..VERSION_SIZE +
SEGMENT_HEADER.len()],
+ SEGMENT_HEADER
+ );
+ }
+
+ #[test]
+ fn test_segment_open() {
+ let dir = tempdir().unwrap();
+ let path = dir
+ .path()
+ .join("segment_0.wal")
+ .to_str()
+ .unwrap()
+ .to_string();
+ let mut segment = Segment::new(path.clone(), 0).unwrap();
+
+ let result = segment.open();
+ assert!(result.is_ok());
+ }
+
+ #[test]
+ fn test_segment_append_and_read() {
+ let dir = tempdir().unwrap();
+ let path = dir
+ .path()
+ .join("segment_0.wal")
+ .to_str()
+ .unwrap()
+ .to_string();
+ let mut segment = Segment::new(path.clone(), 0).unwrap();
+ segment.open().unwrap();
+
+ let data = b"test_data";
+ let append_result = segment.append(data);
+ assert!(append_result.is_ok());
+
+ let read_result = segment.read(
+ (VERSION_SIZE + SEGMENT_HEADER.len()) as u64,
+ data.len() as u64,
+ );
+ assert!(read_result.is_ok());
+ assert_eq!(read_result.unwrap(), data);
+ }
+
+ #[test]
+ fn test_region_creation() {
+ let dir = tempdir().unwrap();
+ let runtime = Arc::new(Builder::default().build().unwrap());
+
+ let segment_manager = Region::new(1, 1,
dir.path().to_str().unwrap().to_string(), runtime);
+ assert!(segment_manager.is_ok());
+
+ let segment_manager = segment_manager.unwrap();
+ let segment = segment_manager.get_segment(0);
+ assert!(segment.is_ok());
+ }
+}
diff --git a/src/wal/src/local_storage_impl/wal_manager.rs
b/src/wal/src/local_storage_impl/wal_manager.rs
new file mode 100644
index 00000000..d05e90d8
--- /dev/null
+++ b/src/wal/src/local_storage_impl/wal_manager.rs
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ fmt,
+ fmt::{Debug, Formatter},
+ path::{Path, PathBuf},
+ sync::Arc,
+};
+
+use async_trait::async_trait;
+use common_types::SequenceNumber;
+use generic_error::BoxError;
+use logger::{debug, info};
+use runtime::Runtime;
+use snafu::ResultExt;
+
+use crate::{
+ config::{Config, StorageConfig},
+ local_storage_impl::{config::LocalStorageConfig, segment::RegionManager},
+ log_batch::LogWriteBatch,
+ manager::{
+ error::*, BatchLogIteratorAdapter, Open, OpenedWals, ReadContext,
ReadRequest, RegionId,
+ ScanContext, ScanRequest, WalLocation, WalManager, WalManagerRef,
WalRuntimes, WalsOpener,
+ WriteContext, MANIFEST_DIR_NAME, WAL_DIR_NAME,
+ },
+};
+
+pub struct LocalStorageImpl {
+ config: LocalStorageConfig,
+ _runtime: Arc<Runtime>,
+ segment_manager: RegionManager,
+}
+
+impl LocalStorageImpl {
+ pub fn new(
+ wal_path: PathBuf,
+ config: LocalStorageConfig,
+ runtime: Arc<Runtime>,
+ ) -> Result<Self> {
+ let LocalStorageConfig { cache_size, .. } = config.clone();
+ let wal_path_str = wal_path.to_str().unwrap().to_string();
+ let segment_manager = RegionManager::new(wal_path_str.clone(),
cache_size, runtime.clone())
+ .box_err()
+ .context(Open {
+ wal_path: wal_path_str,
+ })?;
+ Ok(Self {
+ config,
+ _runtime: runtime,
+ segment_manager,
+ })
+ }
+}
+
+impl Drop for LocalStorageImpl {
+ fn drop(&mut self) {
+ info!("LocalStorage dropped, config:{:?}", self.config);
+ }
+}
+
+impl Debug for LocalStorageImpl {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.debug_struct("LocalStorageImpl")
+ .field("config", &self.config)
+ .finish()
+ }
+}
+
+#[async_trait]
+impl WalManager for LocalStorageImpl {
+ async fn sequence_num(&self, location: WalLocation) -> Result<u64> {
+ self.segment_manager
+ .sequence_num(location)
+ .box_err()
+ .context(Read)
+ }
+
+ async fn mark_delete_entries_up_to(
+ &self,
+ location: WalLocation,
+ sequence_num: SequenceNumber,
+ ) -> Result<()> {
+ self.segment_manager
+ .mark_delete_entries_up_to(location, sequence_num)
+ .box_err()
+ .context(Delete)
+ }
+
+ async fn close_region(&self, region_id: RegionId) -> Result<()> {
+ debug!(
+ "Close region for LocalStorage based WAL is noop operation,
region_id:{}",
+ region_id
+ );
+
+ Ok(())
+ }
+
+ async fn close_gracefully(&self) -> Result<()> {
+ info!("Close local storage wal gracefully");
+ // todo: close all opened files
+ Ok(())
+ }
+
+ async fn read_batch(
+ &self,
+ ctx: &ReadContext,
+ req: &ReadRequest,
+ ) -> Result<BatchLogIteratorAdapter> {
+ self.segment_manager.read(ctx, req).box_err().context(Read)
+ }
+
+ async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) ->
Result<SequenceNumber> {
+ self.segment_manager
+ .write(ctx, batch)
+ .box_err()
+ .context(Write)
+ }
+
+ async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) ->
Result<BatchLogIteratorAdapter> {
+ self.segment_manager.scan(ctx, req).box_err().context(Read)
+ }
+
+ async fn get_statistics(&self) -> Option<String> {
+ None
+ }
+}
+
+#[derive(Default)]
+pub struct LocalStorageWalsOpener;
+
+impl LocalStorageWalsOpener {
+ fn build_manager(
+ wal_path: PathBuf,
+ runtime: Arc<Runtime>,
+ config: LocalStorageConfig,
+ ) -> Result<WalManagerRef> {
+ Ok(Arc::new(LocalStorageImpl::new(wal_path, config, runtime)?))
+ }
+}
+
+#[async_trait]
+impl WalsOpener for LocalStorageWalsOpener {
+ async fn open_wals(&self, config: &Config, runtimes: WalRuntimes) ->
Result<OpenedWals> {
+ let local_storage_wal_config = match &config.storage {
+ StorageConfig::Local(config) => config.clone(),
+ _ => {
+ return InvalidWalConfig {
+ msg: format!(
+ "invalid wal storage config while opening local
storage wal, config:{config:?}"
+ ),
+ }
+ .fail();
+ }
+ };
+
+ let write_runtime = runtimes.write_runtime.clone();
+ let data_path = Path::new(&local_storage_wal_config.path);
+
+ let data_wal = if config.disable_data {
+ Arc::new(crate::dummy::DoNothing)
+ } else {
+ Self::build_manager(
+ data_path.join(WAL_DIR_NAME),
+ write_runtime.clone(),
+ *local_storage_wal_config.clone(),
+ )?
+ };
+
+ let manifest_wal = Self::build_manager(
+ data_path.join(MANIFEST_DIR_NAME),
+ write_runtime.clone(),
+ *local_storage_wal_config.clone(),
+ )?;
+
+ Ok(OpenedWals {
+ data_wal,
+ manifest_wal,
+ })
+ }
+}
diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs
index 483bfee5..cccde53c 100644
--- a/src/wal/tests/read_write.rs
+++ b/src/wal/tests/read_write.rs
@@ -35,6 +35,7 @@ use tempfile::TempDir;
use time_ext::ReadableDuration;
use wal::{
kv_encoder::LogBatchEncoder,
+ local_storage_impl::{config::LocalStorageConfig,
wal_manager::LocalStorageImpl},
log_batch::{LogWriteBatch, MemoryPayload, MemoryPayloadDecoder},
manager::{
BatchLogIteratorAdapter, ReadBoundary, ReadContext, ReadRequest,
ScanRequest, WalLocation,
@@ -70,6 +71,13 @@ fn test_kafka_wal() {
test_all(builder, true);
}
+#[test]
+#[ignore = "this test cannot pass completely, since delete is not supported
yet"]
+fn test_local_storage_wal() {
+ let builder = LocalStorageWalBuilder;
+ test_all(builder, false);
+}
+
fn test_all<B: WalBuilder>(builder: B, is_distributed: bool) {
test_simple_read_write_default_batch(builder.clone());
test_simple_read_write_different_batch_size(builder.clone());
@@ -980,6 +988,22 @@ impl Clone for KafkaWalBuilder {
}
}
+#[derive(Clone, Default)]
+pub struct LocalStorageWalBuilder;
+
+#[async_trait]
+impl WalBuilder for LocalStorageWalBuilder {
+ type Wal = LocalStorageImpl;
+
+ async fn build(&self, data_path: &Path, runtime: Arc<Runtime>) ->
Arc<Self::Wal> {
+ let config = LocalStorageConfig {
+ path: data_path.to_str().unwrap().to_string(),
+ ..LocalStorageConfig::default()
+ };
+ Arc::new(LocalStorageImpl::new(data_path.to_path_buf(), config,
runtime).unwrap())
+ }
+}
+
/// The environment for testing wal.
pub struct TestEnv<B> {
pub dir: TempDir,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]