This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 645a8b35 feat: Implement delete operation for WAL based on local
storage (#1566)
645a8b35 is described below
commit 645a8b3510a0a6e43c8b6e0688766470040ef164
Author: Draco <[email protected]>
AuthorDate: Sat Sep 14 09:59:40 2024 +0800
feat: Implement delete operation for WAL based on local storage (#1566)
## Rationale
Currently the WAL based on the local disk does not support the delete
function. This PR implements that functionality.
This is a follow-up task of #1552 and #1556.
## Detailed Changes
1. For each `Segment`, add a hashmap to record the minimum and maximum
sequence numbers of all tables within that segment. During `delete` and
`write` operations, this hashmap will be updated. During read
operations, logs will be filtered based on this hashmap.
2. During the `delete` operation, based on the aforementioned hashmap,
if all logs of all tables in a read-only segment (a segment that is not
currently being written to) are marked as deleted, the segment file will
be physically deleted from the disk.
## Test Plan
Unit test, TSBS and running a script locally that repeatedly inserts
data, forcibly kills, and restarts the database process to test
persistence.
---
Cargo.lock | 1 +
src/analytic_engine/src/instance/write.rs | 15 -
src/wal/Cargo.toml | 1 +
src/wal/src/local_storage_impl/config.rs | 9 +-
src/wal/src/local_storage_impl/segment.rs | 528 +++++++++++++++++++-------
src/wal/src/local_storage_impl/wal_manager.rs | 24 +-
src/wal/tests/read_write.rs | 3 +-
7 files changed, 409 insertions(+), 172 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 94c22e7a..43d74e7e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8219,6 +8219,7 @@ checksum =
"9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
name = "wal"
version = "2.0.0"
dependencies = [
+ "anyhow",
"async-trait",
"bytes_ext",
"chrono",
diff --git a/src/analytic_engine/src/instance/write.rs
b/src/analytic_engine/src/instance/write.rs
index 95f2efce..8a007d5c 100644
--- a/src/analytic_engine/src/instance/write.rs
+++ b/src/analytic_engine/src/instance/write.rs
@@ -531,21 +531,6 @@ impl<'a> Writer<'a> {
e
})?;
- // When wal is disabled, there is no need to do this check.
- if !self.instance.disable_wal {
- // NOTE: Currently write wal will only increment seq by one,
- // this may change in future.
- let last_seq = table_data.last_sequence();
- if sequence != last_seq + 1 {
- warn!(
- "Sequence must be consecutive, table:{}, table_id:{},
last_sequence:{}, wal_sequence:{}",
- table_data.name,table_data.id,
- table_data.last_sequence(),
- sequence
- );
- }
- }
-
debug!(
"Instance write finished, update sequence, table:{}, table_id:{}
last_sequence:{}",
table_data.name, table_data.id, sequence
diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml
index 0d13ef36..30a5b004 100644
--- a/src/wal/Cargo.toml
+++ b/src/wal/Cargo.toml
@@ -47,6 +47,7 @@ name = "read_write"
required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb",
"wal-local-storage"]
[dependencies]
+anyhow = { workspace = true }
async-trait = { workspace = true }
bytes_ext = { workspace = true }
chrono = { workspace = true }
diff --git a/src/wal/src/local_storage_impl/config.rs
b/src/wal/src/local_storage_impl/config.rs
index 8d018896..b3e70e9b 100644
--- a/src/wal/src/local_storage_impl/config.rs
+++ b/src/wal/src/local_storage_impl/config.rs
@@ -18,17 +18,18 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(default)]
pub struct LocalStorageConfig {
- pub path: String,
- pub max_segment_size: usize,
+ pub data_dir: String,
+ pub segment_size: usize,
pub cache_size: usize,
}
impl Default for LocalStorageConfig {
fn default() -> Self {
Self {
- path: "/tmp/horaedb".to_string(),
- max_segment_size: 64 * 1024 * 1024, // 64MB
+ data_dir: "/tmp/horaedb".to_string(),
+ segment_size: 64 * 1024 * 1024, // 64MB
cache_size: 3,
}
}
diff --git a/src/wal/src/local_storage_impl/segment.rs
b/src/wal/src/local_storage_impl/segment.rs
index 80b37182..02bd1d13 100644
--- a/src/wal/src/local_storage_impl/segment.rs
+++ b/src/wal/src/local_storage_impl/segment.rs
@@ -16,6 +16,7 @@
// under the License.
use std::{
+ cmp::{max, min},
collections::{HashMap, VecDeque},
fmt::Debug,
fs::{self, File, OpenOptions},
@@ -56,8 +57,19 @@ pub enum Error {
#[snafu(display("Failed to map file to memory: {}", source))]
Mmap { source: io::Error },
- #[snafu(display("Segment full"))]
- SegmentFull,
+ #[snafu(display(
+ "Segment {} is full, current size: {}, segment size: {}, try to
append: {}",
+ id,
+ current_size,
+ segment_size,
+ data_size
+ ))]
+ SegmentFull {
+ id: u64,
+ current_size: usize,
+ segment_size: usize,
+ data_size: usize,
+ },
#[snafu(display("Failed to append data to segment file: {}", source))]
SegmentAppend { source: io::Error },
@@ -115,6 +127,9 @@ pub enum Error {
source: GenericError,
backtrace: Backtrace,
},
+
+ #[snafu(display("{}", source))]
+ Internal { source: anyhow::Error },
}
define_result!(Error);
@@ -158,6 +173,10 @@ pub struct Segment {
/// The maximum sequence number of records within this segment.
max_seq: SequenceNumber,
+ /// A hashmap storing both min and max sequence numbers of records within
+ /// this segment for each `TableId`.
+ table_ranges: HashMap<TableId, (SequenceNumber, SequenceNumber)>,
+
/// The encoding format used for records within this segment.
record_encoding: RecordEncoding,
@@ -187,15 +206,16 @@ impl Segment {
version: NEWEST_WAL_SEGMENT_VERSION,
path: path.clone(),
id: segment_id,
- current_size: SEGMENT_HEADER.len(),
+ current_size: VERSION_SIZE + SEGMENT_HEADER.len(),
segment_size,
min_seq: MAX_SEQUENCE_NUMBER,
max_seq: MIN_SEQUENCE_NUMBER,
+ table_ranges: HashMap::new(),
record_encoding: RecordEncoding::newest(),
mmap: None,
record_position: Vec::new(),
write_count: 0,
- last_flushed_position: SEGMENT_HEADER.len(),
+ last_flushed_position: VERSION_SIZE + SEGMENT_HEADER.len(),
};
if !Path::new(&path).exists() {
@@ -208,9 +228,12 @@ impl Segment {
return Ok(segment);
}
- // Open the segment file to update min and max sequence number and
file size
+ // Open the segment file
segment.open()?;
+ // Restore meta info
+ segment.restore_meta()?;
+
// Close the segment file. If the segment is to be used for read or
write, it
// will be opened again
segment.close()?;
@@ -218,7 +241,64 @@ impl Segment {
Ok(segment)
}
+ fn restore_meta(&mut self) -> Result<()> {
+ // Ensure the segment file is open
+ let Some(mmap) = &mut self.mmap else {
+ return SegmentNotOpen { id: self.id }.fail();
+ };
+ let file_size = mmap.len();
+
+ // Read and validate all records
+ let mut pos = VERSION_SIZE + SEGMENT_HEADER.len();
+ let mut record_position = Vec::new();
+
+ self.table_ranges.clear();
+
+ // Scan all records in the segment
+ while pos < file_size {
+ let data = &mmap[pos..];
+
+ match self.record_encoding.decode(data).box_err() {
+ Ok(record) => {
+ record_position.push(Position {
+ start: pos,
+ end: pos + record.len(),
+ });
+
+ // Update max sequence number
+ self.min_seq = min(self.min_seq, record.sequence_num);
+ self.max_seq = max(self.max_seq, record.sequence_num);
+
+ // Update table_ranges
+ self.table_ranges
+ .entry(record.table_id)
+ .and_modify(|seq_range| {
+ seq_range.0 = min(seq_range.0,
record.sequence_num);
+ seq_range.1 = max(seq_range.1,
record.sequence_num);
+ })
+ .or_insert((record.sequence_num, record.sequence_num));
+
+ pos += record.len();
+ }
+ Err(_) => {
+ // If decoding fails, we've reached the end of valid data
+ // TODO: too tricky, refactor later
+ break;
+ }
+ }
+ }
+
+ self.segment_size = file_size;
+ self.record_position = record_position;
+ self.current_size = pos;
+ self.write_count = 0;
+ self.last_flushed_position = pos;
+ Ok(())
+ }
+
pub fn open(&mut self) -> Result<()> {
+ assert!(self.mmap.is_none());
+
// Open the segment file
let file = OpenOptions::new()
.read(true)
@@ -245,50 +325,19 @@ impl Segment {
let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len];
ensure!(header == SEGMENT_HEADER, InvalidHeader);
- // Read and validate all records
- let mut pos = VERSION_SIZE + header_len;
- let mut record_position = Vec::new();
-
- // Update min and max sequence number
- let mut min_seq = MAX_SEQUENCE_NUMBER;
- let mut max_seq = MIN_SEQUENCE_NUMBER;
-
- while pos < file_size as usize {
- let data = &mmap[pos..];
-
- match self.record_encoding.decode(data).box_err() {
- Ok(record) => {
- record_position.push(Position {
- start: pos,
- end: pos + record.len(),
- });
- min_seq = min_seq.min(record.sequence_num);
- max_seq = max_seq.max(record.sequence_num);
- pos += record.len();
- }
- Err(_) => {
- // If decoding fails, we've reached the end of valid data
- // TODO: too tricky, refactor later
- break;
- }
- }
- }
-
self.mmap = Some(mmap);
- self.record_position = record_position;
- self.current_size = pos;
- self.write_count = 0;
- self.last_flushed_position = pos;
- self.min_seq = min_seq;
- self.max_seq = max_seq;
+
Ok(())
}
pub fn close(&mut self) -> Result<()> {
if let Some(ref mut mmap) = self.mmap {
// Flush before closing
- mmap.flush_range(self.last_flushed_position, self.current_size)
- .context(Flush)?;
+ mmap.flush_range(
+ self.last_flushed_position,
+ self.current_size - self.last_flushed_position,
+ )
+ .context(Flush)?;
// Reset the write count
self.write_count = 0;
// Update the last flushed position
@@ -298,15 +347,16 @@ impl Segment {
Ok(())
}
- pub fn is_open(&self) -> bool {
- self.mmap.is_some()
- }
-
/// Append a slice to the segment file.
fn append(&mut self, data: &[u8]) -> Result<()> {
ensure!(
self.current_size + data.len() <= self.segment_size,
- SegmentFull
+ SegmentFull {
+ id: self.id,
+ current_size: self.current_size,
+ segment_size: self.segment_size,
+ data_size: data.len()
+ }
);
// Ensure the segment file is open
@@ -324,8 +374,11 @@ impl Segment {
// Only flush if the write_count reaches FLUSH_INTERVAL
if self.write_count >= FLUSH_INTERVAL {
- mmap.flush_range(self.last_flushed_position, self.current_size +
data.len())
- .context(Flush)?;
+ mmap.flush_range(
+ self.last_flushed_position,
+ self.current_size + data.len() - self.last_flushed_position,
+ )
+ .context(Flush)?;
// Reset the write count
self.write_count = 0;
// Update the last flushed position
@@ -360,6 +413,7 @@ impl Segment {
&mut self,
data: &[u8],
positions: &mut Vec<Position>,
+ table_id: TableId,
prev_sequence_num: SequenceNumber,
next_sequence_num: SequenceNumber,
) -> Result<()> {
@@ -370,9 +424,44 @@ impl Segment {
self.record_position.append(positions);
// Update min and max sequence number
- self.min_seq = self.min_seq.min(prev_sequence_num);
- self.max_seq = self.max_seq.max(next_sequence_num - 1);
+ self.min_seq = min(self.min_seq, prev_sequence_num);
+ self.max_seq = max(self.max_seq, next_sequence_num - 1);
+
+ // Update sequence range
+ self.table_ranges
+ .entry(table_id)
+ .and_modify(|seq_range| {
+ seq_range.0 = min(seq_range.0, prev_sequence_num);
+ seq_range.1 = max(seq_range.1, next_sequence_num - 1);
+ })
+ .or_insert((prev_sequence_num, next_sequence_num - 1));
+
+ Ok(())
+ }
+
+ fn mark_deleted(&mut self, table_id: TableId, sequence_num:
SequenceNumber) {
+ if let Some(range) = self.table_ranges.get_mut(&table_id) {
+ // If sequence number is MAX, remove the range directly to prevent
overflow
+ if sequence_num == MAX_SEQUENCE_NUMBER {
+ self.table_ranges.remove(&table_id);
+ return;
+ }
+ range.0 = max(range.0, sequence_num + 1);
+ if range.0 > range.1 {
+ self.table_ranges.remove(&table_id);
+ }
+ }
+ }
+
+ fn is_empty(&self) -> bool {
+ self.table_ranges.is_empty()
+ }
+ fn delete(&mut self) -> Result<()> {
+ self.close()?;
+ fs::remove_file(&self.path)
+ .map_err(anyhow::Error::new)
+ .context(Internal)?;
Ok(())
}
}
@@ -383,10 +472,13 @@ pub struct SegmentManager {
all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
/// Cache for opened segments
- cache: Mutex<VecDeque<u64>>,
+ cache: Mutex<VecDeque<(u64, Arc<Mutex<Segment>>)>>,
/// Maximum size of the cache
cache_size: usize,
+
+ /// The latest segment for appending logs
+ current_segment: Mutex<Arc<Mutex<Segment>>>,
}
impl SegmentManager {
@@ -396,63 +488,129 @@ impl SegmentManager {
Ok(())
}
- /// Obtain the target segment
- fn get_segment(&self, segment_id: u64) -> Result<Arc<Mutex<Segment>>> {
- let all_segments = self.all_segments.lock().unwrap();
-
- let segment = all_segments.get(&segment_id);
-
- let segment = match segment {
- Some(segment) => segment,
- None => return SegmentNotFound { id: segment_id }.fail(),
- };
-
- Ok(segment.clone())
- }
-
/// Open segment if it is not in cache, need to acquire the lock outside
- fn open_segment(&self, segment: &mut Segment) -> Result<()> {
+ /// otherwise the segment may get closed again.
+ fn open_segment(&self, guard: &mut Segment, segment: Arc<Mutex<Segment>>)
-> Result<()> {
+ if guard.mmap.is_some() {
+ return Ok(());
+ }
+
let mut cache = self.cache.lock().unwrap();
- // Check if segment is already in cache
- if cache.iter().any(|id| *id == segment.id) {
+ let already_opened = cache.iter().any(|(id, _)| *id == guard.id);
+ if already_opened {
return Ok(());
}
- // If not in cache, load from disk
- segment.open()?;
+ guard.open()?;
- // Add to cache
+ // Try evicting the oldest segment if the cache is full
if cache.len() == self.cache_size {
- let evicted_segment_id = cache.pop_front();
- if let Some(evicted_segment_id) = evicted_segment_id {
+ let evicted_segment = cache.pop_front();
+ if let Some((_, evicted_segment)) = evicted_segment {
// The evicted segment should be closed first
- let evicted_segment = self.get_segment(evicted_segment_id)?;
let mut evicted_segment = evicted_segment.lock().unwrap();
evicted_segment.close()?;
}
}
- cache.push_back(segment.id);
+ cache.push_back((guard.id, segment.clone()));
+ Ok(())
+ }
+ pub fn close_all(&self) -> Result<()> {
+ {
+ let mut cache = self.cache.lock().unwrap();
+ cache.clear();
+ }
+ let all_segments = self.all_segments.lock().unwrap();
+ for segment in all_segments.values() {
+ segment.lock().unwrap().close()?;
+ }
Ok(())
}
pub fn mark_delete_entries_up_to(
&self,
- _location: WalLocation,
- _sequence_num: SequenceNumber,
+ location: WalLocation,
+ sequence_num: SequenceNumber,
) -> Result<()> {
- todo!()
+ let current_segment_id =
self.current_segment.lock().unwrap().lock().unwrap().id;
+ let mut all_segments = self.all_segments.lock().unwrap();
+ let mut segments_to_remove = Vec::new();
+
+ for (_, segment) in all_segments.iter() {
+ let mut guard = segment.lock().unwrap();
+
+ guard.mark_deleted(location.table_id, sequence_num);
+
+ // Delete this segment if it is empty and its id is less than the
current
+ // segment
+ if guard.is_empty() && guard.id < current_segment_id {
+ let mut cache = self.cache.lock().unwrap();
+
+ // Check if segment is already in cache
+ if let Some(index) = cache.iter().position(|(id, _)| *id ==
guard.id) {
+ cache.remove(index);
+ }
+
+ segments_to_remove.push((guard.id, segment.clone()));
+ }
+ }
+
+ // Delete segments in all_segments
+ for (segment_id, _) in segments_to_remove.iter() {
+ all_segments.remove(segment_id);
+ }
+
+ drop(all_segments);
+
+ // Delete segments on disk
+ for (_, segment) in segments_to_remove.iter() {
+ segment.lock().unwrap().delete()?;
+ }
+
+ Ok(())
}
- pub fn close_all(&self) -> Result<()> {
- let mut cache = self.cache.lock().unwrap();
- cache.clear();
+ pub fn get_relevant_segments(
+ &self,
+ table_id: Option<TableId>,
+ start: SequenceNumber,
+ end: SequenceNumber,
+ ) -> Result<Vec<Arc<Mutex<Segment>>>> {
+ // Find all segments that contain the requested sequence numbers
+ let mut relevant_segments = Vec::new();
+
let all_segments = self.all_segments.lock().unwrap();
+
for segment in all_segments.values() {
- segment.lock().unwrap().close()?;
+ let guard = segment.lock().unwrap();
+ match table_id {
+ Some(table_id) => {
+ if let Some(range) = guard.table_ranges.get(&table_id) {
+ if range.0 <= end && range.1 >= start {
+ relevant_segments.push((guard.id,
segment.clone()));
+ }
+ }
+ }
+ None => {
+ if guard.min_seq <= end && guard.max_seq >= start {
+ relevant_segments.push((guard.id, segment.clone()));
+ }
+ }
+ }
}
- Ok(())
+
+ // Sort by segment id
+ relevant_segments.sort_by_key(|(id, _)| *id);
+
+ // id is not needed, so remove it
+ let relevant_segments = relevant_segments
+ .into_iter()
+ .map(|(_, segment)| segment)
+ .collect();
+
+ Ok(relevant_segments)
}
}
@@ -482,9 +640,6 @@ pub struct Region {
/// Sequence number for the next log
next_sequence_num: AtomicU64,
-
- /// The latest segment for appending logs
- current_segment: Mutex<Arc<Mutex<Segment>>>,
}
impl Region {
@@ -557,6 +712,7 @@ impl Region {
all_segments: Mutex::new(all_segments),
cache: Mutex::new(VecDeque::new()),
cache_size,
+ current_segment: Mutex::new(latest_segment),
};
Ok(Self {
@@ -568,17 +724,29 @@ impl Region {
region_dir,
next_sequence_num: AtomicU64::new(next_sequence_num),
runtime,
- current_segment: Mutex::new(latest_segment),
})
}
+ fn create_new_segment(&self, id: u64) -> Result<Arc<Mutex<Segment>>> {
+ // Create a new segment
+ let new_segment = Segment::new(
+ format!("{}/segment_{}.wal", self.region_dir, id),
+ id,
+ self.segment_size,
+ )?;
+ let new_segment = Arc::new(Mutex::new(new_segment));
+ self.segment_manager.add_segment(id, new_segment.clone())?;
+
+ Ok(new_segment)
+ }
+
pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) ->
Result<SequenceNumber> {
// In the WAL based on local storage, we need to ensure the sequence
number in
// segment is monotonically increasing. So we need to acquire a lock
here.
// Perhaps we could avoid acquiring the lock here and instead allocate
the
// position that needs to be written in the segment, then fill it
within
// spawn_blocking. However, I’m not sure about the correctness of this
approach.
- let mut current_segment = self.current_segment.lock().unwrap();
+ let mut current_segment =
self.segment_manager.current_segment.lock().unwrap();
let entries_num = batch.len() as u64;
let table_id = batch.location.table_id;
@@ -608,35 +776,23 @@ impl Region {
next_sequence_num += 1;
}
- let guard = current_segment.lock().unwrap();
-
// Check if the current segment has enough space for the new data
// If not, create a new segment and update current_segment
- if guard.current_size + data.len() > guard.segment_size {
- let new_segment_id = guard.id + 1;
- // We need to drop guard to allow the update of current_segment
- drop(guard);
-
- // Create a new segment
- let new_segment = Segment::new(
- format!("{}/segment_{}.wal", self.region_dir, new_segment_id),
- new_segment_id,
- self.segment_size,
- )?;
- let new_segment = Arc::new(Mutex::new(new_segment));
- self.segment_manager
- .add_segment(new_segment_id, new_segment.clone())?;
+ {
+ let guard = current_segment.lock().unwrap();
+ if guard.current_size + data.len() > guard.segment_size {
+ let new_segment_id = guard.id + 1;
+ drop(guard);
- // Update current segment
- *current_segment = new_segment;
- } else {
- drop(guard);
+ *current_segment = self.create_new_segment(new_segment_id)?;
+ }
}
+ // Open the segment if not opened
let mut guard = current_segment.lock().unwrap();
+ self.segment_manager
+ .open_segment(&mut guard, current_segment.clone())?;
- // Open the segment if not opened
- self.segment_manager.open_segment(&mut guard)?;
for pos in record_position.iter_mut() {
pos.start += guard.current_size;
pos.end += guard.current_size;
@@ -646,8 +802,9 @@ impl Region {
guard.append_records(
&data,
&mut record_position,
+ table_id,
prev_sequence_num,
- next_sequence_num - 1,
+ next_sequence_num,
)?;
Ok(next_sequence_num - 1)
}
@@ -742,6 +899,8 @@ impl RegionManager {
segment_size: usize,
runtime: Arc<Runtime>,
) -> Result<Self> {
+ fs::create_dir_all(&root_dir).context(DirOpen)?;
+
let mut regions = HashMap::new();
// Naming conversion: <root_dir>/<region_id>
@@ -864,6 +1023,9 @@ struct SegmentLogIterator {
/// Optional identifier for the table, which is used to filter logs.
table_id: Option<TableId>,
+ /// A hashmap of start and end sequence number for each table.
+ table_ranges: HashMap<TableId, (SequenceNumber, SequenceNumber)>,
+
/// Starting sequence number for log iteration.
start: SequenceNumber,
@@ -887,17 +1049,12 @@ impl SegmentLogIterator {
start: SequenceNumber,
end: SequenceNumber,
) -> Result<Self> {
+ let mut guard = segment.lock().unwrap();
// Open the segment if it is not open
- let mut segment = segment.lock().unwrap();
- if !segment.is_open() {
- segment_manager.open_segment(&mut segment)?;
- }
-
- // Read the entire content of the segment
- let segment_content = segment.read(0, segment.current_size)?;
-
- // Get record positions
- let record_positions = segment.record_position.clone();
+ segment_manager.open_segment(&mut guard, segment.clone())?;
+ let segment_content = guard.read(0, guard.current_size)?;
+ let record_positions = guard.record_position.clone();
+ let table_ranges = guard.table_ranges.clone();
Ok(Self {
log_encoding,
@@ -905,6 +1062,7 @@ impl SegmentLogIterator {
segment_content,
record_positions,
table_id,
+ table_ranges,
start,
end,
current_record_idx: 0,
@@ -952,6 +1110,15 @@ impl SegmentLogIterator {
}
}
+ // Filter by sequence range
+ if let Some((start, end)) =
&self.table_ranges.get(&record.table_id) {
+ if record.sequence_num < *start || record.sequence_num > *end {
+ continue;
+ }
+ } else {
+ continue;
+ }
+
// Decode the value
let value = self
.log_encoding
@@ -975,7 +1142,7 @@ pub struct MultiSegmentLogIterator {
segment_manager: Arc<SegmentManager>,
/// All segments involved in this read operation.
- segments: Vec<u64>,
+ segments: Vec<Arc<Mutex<Segment>>>,
/// Current segment index.
current_segment_idx: usize,
@@ -1011,22 +1178,7 @@ impl MultiSegmentLogIterator {
start: SequenceNumber,
end: SequenceNumber,
) -> Result<Self> {
- // Find all segments that contain the requested sequence numbers
- let mut relevant_segments = Vec::new();
-
- {
- let all_segments = segment_manager.all_segments.lock().unwrap();
-
- for (_, segment) in all_segments.iter() {
- let segment = segment.lock().unwrap();
- if segment.min_seq <= end && segment.max_seq >= start {
- relevant_segments.push(segment.id);
- }
- }
- }
-
- // Sort by segment id
- relevant_segments.sort_unstable();
+ let relevant_segments =
segment_manager.get_relevant_segments(table_id, start, end)?;
let mut iter = Self {
segment_manager,
@@ -1053,8 +1205,7 @@ impl MultiSegmentLogIterator {
return Ok(false);
}
- let segment = self.segments[self.current_segment_idx];
- let segment = self.segment_manager.get_segment(segment)?;
+ let segment = self.segments[self.current_segment_idx].clone();
let iterator = SegmentLogIterator::new(
self.log_encoding.clone(),
self.record_encoding.clone(),
@@ -1129,7 +1280,7 @@ mod tests {
assert_eq!(segment.version, NEWEST_WAL_SEGMENT_VERSION);
assert_eq!(segment.path, path);
assert_eq!(segment.id, 0);
- assert_eq!(segment.current_size, SEGMENT_HEADER.len());
+ assert_eq!(segment.current_size, SEGMENT_HEADER.len() + VERSION_SIZE);
let segment_content = fs::read(path).unwrap();
assert_eq!(segment_content[0], NEWEST_WAL_SEGMENT_VERSION);
@@ -1153,6 +1304,10 @@ mod tests {
segment
.open()
.expect("Expected to open segment successfully");
+
+ segment
+ .restore_meta()
+ .expect("Expected to restore meta successfully");
}
#[test]
@@ -1166,6 +1321,7 @@ mod tests {
.to_string();
let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap();
segment.open().unwrap();
+ segment.restore_meta().unwrap();
let data = b"test_data";
let append_result = segment.append(data);
@@ -1176,6 +1332,26 @@ mod tests {
assert_eq!(read_result.unwrap(), data);
}
+ #[test]
+ fn test_segment_delete() {
+ let dir = tempdir().unwrap();
+
+ let path = dir
+ .path()
+ .join("segment_0.wal")
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap();
+
+ segment.open().unwrap();
+ assert!(Path::new(&path).exists());
+
+ segment.delete().unwrap();
+ assert!(!Path::new(&path).exists());
+ }
+
#[test]
fn test_region_create_and_close() {
let dir = tempdir().unwrap();
@@ -1190,8 +1366,6 @@ mod tests {
)
.unwrap();
- let _segment = region.segment_manager.get_segment(0).unwrap();
-
region.close().unwrap()
}
@@ -1300,4 +1474,70 @@ mod tests {
let runtime = Arc::new(Builder::default().build().unwrap());
runtime.block_on(test_multi_segment_write_and_read_inner(runtime.clone()));
}
+
+ #[test]
+ fn test_region_mark_delete_entries_up_to() {
+ const SEGMENT_SIZE: usize = 4096;
+
+ let dir = tempdir().unwrap();
+ let runtime = Arc::new(Builder::default().build().unwrap());
+
+ // Create a new region
+ let region = Region::new(
+ 1,
+ 2,
+ SEGMENT_SIZE,
+ dir.path().to_str().unwrap().to_string(),
+ runtime.clone(),
+ )
+ .unwrap();
+ let region = Arc::new(region);
+
+ // Write some log entries
+ let location = WalLocation::new(1, 1); // region_id = 1, table_id = 1
+ let mut sequence = MIN_SEQUENCE_NUMBER + 1;
+
+ for _i in 0..10 {
+ let log_entries = 0..100;
+ let log_batch_encoder = LogBatchEncoder::create(location);
+ let log_batch = log_batch_encoder
+ .encode_batch(log_entries.clone().map(|v| MemoryPayload { val:
v }))
+ .expect("should succeed to encode payloads");
+
+ let write_ctx = WriteContext::default();
+ let actual_sequence = region.write(&write_ctx,
&log_batch).unwrap();
+ assert_eq!(actual_sequence, sequence + 100 - 1);
+ sequence += 100;
+ }
+
+ let latest_segment_id = {
+ let all_segments =
region.segment_manager.all_segments.lock().unwrap();
+ // Expect more than one segment
+ assert!(
+ all_segments.len() > 1,
+ "Expected multiple segments, but got {}",
+ all_segments.len()
+ );
+ all_segments.keys().max().unwrap().to_owned()
+ };
+
+ // Mark delete entries up to sequence - 1, so only the last segment
should
+ // remain
+ let mark_delete_sequence = sequence - 1;
+ region
+ .mark_delete_entries_up_to(location, mark_delete_sequence)
+ .unwrap();
+
+ {
+ let all_segments =
region.segment_manager.all_segments.lock().unwrap();
+ assert_eq!(all_segments.len(), 1);
+ assert!(all_segments.contains_key(&latest_segment_id));
+ }
+
+ // The num of segment in the dir should be 1
+ let segment_count = fs::read_dir(dir.path()).unwrap().count();
+ assert_eq!(segment_count, 1);
+
+ region.close().unwrap();
+ }
}
diff --git a/src/wal/src/local_storage_impl/wal_manager.rs
b/src/wal/src/local_storage_impl/wal_manager.rs
index 91c69fcd..694831ea 100644
--- a/src/wal/src/local_storage_impl/wal_manager.rs
+++ b/src/wal/src/local_storage_impl/wal_manager.rs
@@ -54,14 +54,14 @@ impl LocalStorageImpl {
) -> Result<Self> {
let LocalStorageConfig {
cache_size,
- max_segment_size,
+ segment_size,
..
} = config.clone();
let wal_path_str = wal_path.to_str().unwrap().to_string();
let region_manager = RegionManager::new(
wal_path_str.clone(),
cache_size,
- max_segment_size,
+ segment_size,
runtime.clone(),
)
.box_err()
@@ -104,6 +104,10 @@ impl WalManager for LocalStorageImpl {
location: WalLocation,
sequence_num: SequenceNumber,
) -> Result<()> {
+ debug!(
+ "Mark delete entries up to {} for location:{:?}",
+ sequence_num, location
+ );
self.region_manager
.mark_delete_entries_up_to(location, sequence_num)
.box_err()
@@ -111,10 +115,7 @@ impl WalManager for LocalStorageImpl {
}
async fn close_region(&self, region_id: RegionId) -> Result<()> {
- debug!(
- "Close region for LocalStorage based WAL is noop operation,
region_id:{}",
- region_id
- );
+ debug!("Close region {} for LocalStorage based WAL", region_id);
self.region_manager
.close(region_id)
.box_err()
@@ -133,10 +134,15 @@ impl WalManager for LocalStorageImpl {
ctx: &ReadContext,
req: &ReadRequest,
) -> Result<BatchLogIteratorAdapter> {
+ debug!(
+ "Read batch from LocalStorage based WAL, ctx:{:?}, req:{:?}",
+ ctx, req
+ );
self.region_manager.read(ctx, req).box_err().context(Read)
}
async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) ->
Result<SequenceNumber> {
+ debug!("Write batch to LocalStorage based WAL, ctx:{:?}", ctx);
self.region_manager
.write(ctx, batch)
.box_err()
@@ -144,6 +150,10 @@ impl WalManager for LocalStorageImpl {
}
async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) ->
Result<BatchLogIteratorAdapter> {
+ debug!(
+ "Scan from LocalStorage based WAL, ctx:{:?}, req:{:?}",
+ ctx, req
+ );
self.region_manager.scan(ctx, req).box_err().context(Read)
}
@@ -181,7 +191,7 @@ impl WalsOpener for LocalStorageWalsOpener {
};
let write_runtime = runtimes.write_runtime.clone();
- let data_path = Path::new(&local_storage_wal_config.path);
+ let data_path = Path::new(&local_storage_wal_config.data_dir);
let data_wal = if config.disable_data {
Arc::new(crate::dummy::DoNothing)
diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs
index cccde53c..24c4c75f 100644
--- a/src/wal/tests/read_write.rs
+++ b/src/wal/tests/read_write.rs
@@ -72,7 +72,6 @@ fn test_kafka_wal() {
}
#[test]
-#[ignore = "this test cannot pass completely, since delete is not supported
yet"]
fn test_local_storage_wal() {
let builder = LocalStorageWalBuilder;
test_all(builder, false);
@@ -997,7 +996,7 @@ impl WalBuilder for LocalStorageWalBuilder {
async fn build(&self, data_path: &Path, runtime: Arc<Runtime>) ->
Arc<Self::Wal> {
let config = LocalStorageConfig {
- path: data_path.to_str().unwrap().to_string(),
+ data_dir: data_path.to_str().unwrap().to_string(),
..LocalStorageConfig::default()
};
Arc::new(LocalStorageImpl::new(data_path.to_path_buf(), config,
runtime).unwrap())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]