This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 645a8b35 feat: Implement delete operation for WAL based on local 
storage (#1566)
645a8b35 is described below

commit 645a8b3510a0a6e43c8b6e0688766470040ef164
Author: Draco <[email protected]>
AuthorDate: Sat Sep 14 09:59:40 2024 +0800

    feat: Implement delete operation for WAL based on local storage (#1566)
    
    ## Rationale
    
    Currently the WAL based on the local disk does not support the delete
    function. This PR implements that functionality.
    
    This is a follow-up task of #1552 and #1556.
    
    ## Detailed Changes
    
    1. For each `Segment`, add a hashmap to record the minimum and maximum
    sequence numbers of all tables within that segment. During `delete` and
    `write` operations, this hashmap will be updated. During read
    operations, logs will be filtered based on this hashmap.
    
    2. During the `delete` operation, based on the aforementioned hashmap,
    if all logs of all tables in a read-only segment (a segment that is not
    currently being written to) are marked as deleted, the segment file will
    be physically deleted from the disk.
    
    ## Test Plan
    
    Unit test, TSBS and running a script locally that repeatedly inserts
    data, forcibly kills, and restarts the database process to test
    persistence.
---
 Cargo.lock                                    |   1 +
 src/analytic_engine/src/instance/write.rs     |  15 -
 src/wal/Cargo.toml                            |   1 +
 src/wal/src/local_storage_impl/config.rs      |   9 +-
 src/wal/src/local_storage_impl/segment.rs     | 528 +++++++++++++++++++-------
 src/wal/src/local_storage_impl/wal_manager.rs |  24 +-
 src/wal/tests/read_write.rs                   |   3 +-
 7 files changed, 409 insertions(+), 172 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 94c22e7a..43d74e7e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8219,6 +8219,7 @@ checksum = 
"9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
 name = "wal"
 version = "2.0.0"
 dependencies = [
+ "anyhow",
  "async-trait",
  "bytes_ext",
  "chrono",
diff --git a/src/analytic_engine/src/instance/write.rs 
b/src/analytic_engine/src/instance/write.rs
index 95f2efce..8a007d5c 100644
--- a/src/analytic_engine/src/instance/write.rs
+++ b/src/analytic_engine/src/instance/write.rs
@@ -531,21 +531,6 @@ impl<'a> Writer<'a> {
                 e
             })?;
 
-        // When wal is disabled, there is no need to do this check.
-        if !self.instance.disable_wal {
-            // NOTE: Currently write wal will only increment seq by one,
-            // this may change in future.
-            let last_seq = table_data.last_sequence();
-            if sequence != last_seq + 1 {
-                warn!(
-                    "Sequence must be consecutive, table:{}, table_id:{}, 
last_sequence:{}, wal_sequence:{}",
-                    table_data.name,table_data.id,
-                    table_data.last_sequence(),
-                    sequence
-                );
-            }
-        }
-
         debug!(
             "Instance write finished, update sequence, table:{}, table_id:{} 
last_sequence:{}",
             table_data.name, table_data.id, sequence
diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml
index 0d13ef36..30a5b004 100644
--- a/src/wal/Cargo.toml
+++ b/src/wal/Cargo.toml
@@ -47,6 +47,7 @@ name = "read_write"
 required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb", 
"wal-local-storage"]
 
 [dependencies]
+anyhow = { workspace = true }
 async-trait = { workspace = true }
 bytes_ext = { workspace = true }
 chrono = { workspace = true }
diff --git a/src/wal/src/local_storage_impl/config.rs 
b/src/wal/src/local_storage_impl/config.rs
index 8d018896..b3e70e9b 100644
--- a/src/wal/src/local_storage_impl/config.rs
+++ b/src/wal/src/local_storage_impl/config.rs
@@ -18,17 +18,18 @@
 use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(default)]
 pub struct LocalStorageConfig {
-    pub path: String,
-    pub max_segment_size: usize,
+    pub data_dir: String,
+    pub segment_size: usize,
     pub cache_size: usize,
 }
 
 impl Default for LocalStorageConfig {
     fn default() -> Self {
         Self {
-            path: "/tmp/horaedb".to_string(),
-            max_segment_size: 64 * 1024 * 1024, // 64MB
+            data_dir: "/tmp/horaedb".to_string(),
+            segment_size: 64 * 1024 * 1024, // 64MB
             cache_size: 3,
         }
     }
diff --git a/src/wal/src/local_storage_impl/segment.rs 
b/src/wal/src/local_storage_impl/segment.rs
index 80b37182..02bd1d13 100644
--- a/src/wal/src/local_storage_impl/segment.rs
+++ b/src/wal/src/local_storage_impl/segment.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::{
+    cmp::{max, min},
     collections::{HashMap, VecDeque},
     fmt::Debug,
     fs::{self, File, OpenOptions},
@@ -56,8 +57,19 @@ pub enum Error {
     #[snafu(display("Failed to map file to memory: {}", source))]
     Mmap { source: io::Error },
 
-    #[snafu(display("Segment full"))]
-    SegmentFull,
+    #[snafu(display(
+        "Segment {} is full, current size: {}, segment size: {}, try to 
append: {}",
+        id,
+        current_size,
+        segment_size,
+        data_size
+    ))]
+    SegmentFull {
+        id: u64,
+        current_size: usize,
+        segment_size: usize,
+        data_size: usize,
+    },
 
     #[snafu(display("Failed to append data to segment file: {}", source))]
     SegmentAppend { source: io::Error },
@@ -115,6 +127,9 @@ pub enum Error {
         source: GenericError,
         backtrace: Backtrace,
     },
+
+    #[snafu(display("{}", source))]
+    Internal { source: anyhow::Error },
 }
 
 define_result!(Error);
@@ -158,6 +173,10 @@ pub struct Segment {
     /// The maximum sequence number of records within this segment.
     max_seq: SequenceNumber,
 
+    /// A hashmap storing both min and max sequence numbers of records within
+    /// this segment for each `TableId`.
+    table_ranges: HashMap<TableId, (SequenceNumber, SequenceNumber)>,
+
     /// The encoding format used for records within this segment.
     record_encoding: RecordEncoding,
 
@@ -187,15 +206,16 @@ impl Segment {
             version: NEWEST_WAL_SEGMENT_VERSION,
             path: path.clone(),
             id: segment_id,
-            current_size: SEGMENT_HEADER.len(),
+            current_size: VERSION_SIZE + SEGMENT_HEADER.len(),
             segment_size,
             min_seq: MAX_SEQUENCE_NUMBER,
             max_seq: MIN_SEQUENCE_NUMBER,
+            table_ranges: HashMap::new(),
             record_encoding: RecordEncoding::newest(),
             mmap: None,
             record_position: Vec::new(),
             write_count: 0,
-            last_flushed_position: SEGMENT_HEADER.len(),
+            last_flushed_position: VERSION_SIZE + SEGMENT_HEADER.len(),
         };
 
         if !Path::new(&path).exists() {
@@ -208,9 +228,12 @@ impl Segment {
             return Ok(segment);
         }
 
-        // Open the segment file to update min and max sequence number and 
file size
+        // Open the segment file
         segment.open()?;
 
+        // Restore meta info
+        segment.restore_meta()?;
+
         // Close the segment file. If the segment is to be used for read or 
write, it
         // will be opened again
         segment.close()?;
@@ -218,7 +241,64 @@ impl Segment {
         Ok(segment)
     }
 
+    fn restore_meta(&mut self) -> Result<()> {
+        // Ensure the segment file is open
+        let Some(mmap) = &mut self.mmap else {
+            return SegmentNotOpen { id: self.id }.fail();
+        };
+        let file_size = mmap.len();
+
+        // Read and validate all records
+        let mut pos = VERSION_SIZE + SEGMENT_HEADER.len();
+        let mut record_position = Vec::new();
+
+        self.table_ranges.clear();
+
+        // Scan all records in the segment
+        while pos < file_size {
+            let data = &mmap[pos..];
+
+            match self.record_encoding.decode(data).box_err() {
+                Ok(record) => {
+                    record_position.push(Position {
+                        start: pos,
+                        end: pos + record.len(),
+                    });
+
+                    // Update max sequence number
+                    self.min_seq = min(self.min_seq, record.sequence_num);
+                    self.max_seq = max(self.max_seq, record.sequence_num);
+
+                    // Update table_ranges
+                    self.table_ranges
+                        .entry(record.table_id)
+                        .and_modify(|seq_range| {
+                            seq_range.0 = min(seq_range.0, 
record.sequence_num);
+                            seq_range.1 = max(seq_range.1, 
record.sequence_num);
+                        })
+                        .or_insert((record.sequence_num, record.sequence_num));
+
+                    pos += record.len();
+                }
+                Err(_) => {
+                    // If decoding fails, we've reached the end of valid data
+                    // TODO: too tricky, refactor later
+                    break;
+                }
+            }
+        }
+
+        self.segment_size = file_size;
+        self.record_position = record_position;
+        self.current_size = pos;
+        self.write_count = 0;
+        self.last_flushed_position = pos;
+        Ok(())
+    }
+
     pub fn open(&mut self) -> Result<()> {
+        assert!(self.mmap.is_none());
+
         // Open the segment file
         let file = OpenOptions::new()
             .read(true)
@@ -245,50 +325,19 @@ impl Segment {
         let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len];
         ensure!(header == SEGMENT_HEADER, InvalidHeader);
 
-        // Read and validate all records
-        let mut pos = VERSION_SIZE + header_len;
-        let mut record_position = Vec::new();
-
-        // Update min and max sequence number
-        let mut min_seq = MAX_SEQUENCE_NUMBER;
-        let mut max_seq = MIN_SEQUENCE_NUMBER;
-
-        while pos < file_size as usize {
-            let data = &mmap[pos..];
-
-            match self.record_encoding.decode(data).box_err() {
-                Ok(record) => {
-                    record_position.push(Position {
-                        start: pos,
-                        end: pos + record.len(),
-                    });
-                    min_seq = min_seq.min(record.sequence_num);
-                    max_seq = max_seq.max(record.sequence_num);
-                    pos += record.len();
-                }
-                Err(_) => {
-                    // If decoding fails, we've reached the end of valid data
-                    // TODO: too tricky, refactor later
-                    break;
-                }
-            }
-        }
-
         self.mmap = Some(mmap);
-        self.record_position = record_position;
-        self.current_size = pos;
-        self.write_count = 0;
-        self.last_flushed_position = pos;
-        self.min_seq = min_seq;
-        self.max_seq = max_seq;
+
         Ok(())
     }
 
     pub fn close(&mut self) -> Result<()> {
         if let Some(ref mut mmap) = self.mmap {
             // Flush before closing
-            mmap.flush_range(self.last_flushed_position, self.current_size)
-                .context(Flush)?;
+            mmap.flush_range(
+                self.last_flushed_position,
+                self.current_size - self.last_flushed_position,
+            )
+            .context(Flush)?;
             // Reset the write count
             self.write_count = 0;
             // Update the last flushed position
@@ -298,15 +347,16 @@ impl Segment {
         Ok(())
     }
 
-    pub fn is_open(&self) -> bool {
-        self.mmap.is_some()
-    }
-
     /// Append a slice to the segment file.
     fn append(&mut self, data: &[u8]) -> Result<()> {
         ensure!(
             self.current_size + data.len() <= self.segment_size,
-            SegmentFull
+            SegmentFull {
+                id: self.id,
+                current_size: self.current_size,
+                segment_size: self.segment_size,
+                data_size: data.len()
+            }
         );
 
         // Ensure the segment file is open
@@ -324,8 +374,11 @@ impl Segment {
 
         // Only flush if the write_count reaches FLUSH_INTERVAL
         if self.write_count >= FLUSH_INTERVAL {
-            mmap.flush_range(self.last_flushed_position, self.current_size + 
data.len())
-                .context(Flush)?;
+            mmap.flush_range(
+                self.last_flushed_position,
+                self.current_size + data.len() - self.last_flushed_position,
+            )
+            .context(Flush)?;
             // Reset the write count
             self.write_count = 0;
             // Update the last flushed position
@@ -360,6 +413,7 @@ impl Segment {
         &mut self,
         data: &[u8],
         positions: &mut Vec<Position>,
+        table_id: TableId,
         prev_sequence_num: SequenceNumber,
         next_sequence_num: SequenceNumber,
     ) -> Result<()> {
@@ -370,9 +424,44 @@ impl Segment {
         self.record_position.append(positions);
 
         // Update min and max sequence number
-        self.min_seq = self.min_seq.min(prev_sequence_num);
-        self.max_seq = self.max_seq.max(next_sequence_num - 1);
+        self.min_seq = min(self.min_seq, prev_sequence_num);
+        self.max_seq = max(self.max_seq, next_sequence_num - 1);
+
+        // Update sequence range
+        self.table_ranges
+            .entry(table_id)
+            .and_modify(|seq_range| {
+                seq_range.0 = min(seq_range.0, prev_sequence_num);
+                seq_range.1 = max(seq_range.1, next_sequence_num - 1);
+            })
+            .or_insert((prev_sequence_num, next_sequence_num - 1));
+
+        Ok(())
+    }
+
+    fn mark_deleted(&mut self, table_id: TableId, sequence_num: 
SequenceNumber) {
+        if let Some(range) = self.table_ranges.get_mut(&table_id) {
+            // If sequence number is MAX, remove the range directly to prevent 
overflow
+            if sequence_num == MAX_SEQUENCE_NUMBER {
+                self.table_ranges.remove(&table_id);
+                return;
+            }
+            range.0 = max(range.0, sequence_num + 1);
+            if range.0 > range.1 {
+                self.table_ranges.remove(&table_id);
+            }
+        }
+    }
+
+    fn is_empty(&self) -> bool {
+        self.table_ranges.is_empty()
+    }
 
+    fn delete(&mut self) -> Result<()> {
+        self.close()?;
+        fs::remove_file(&self.path)
+            .map_err(anyhow::Error::new)
+            .context(Internal)?;
         Ok(())
     }
 }
@@ -383,10 +472,13 @@ pub struct SegmentManager {
     all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
 
     /// Cache for opened segments
-    cache: Mutex<VecDeque<u64>>,
+    cache: Mutex<VecDeque<(u64, Arc<Mutex<Segment>>)>>,
 
     /// Maximum size of the cache
     cache_size: usize,
+
+    /// The latest segment for appending logs
+    current_segment: Mutex<Arc<Mutex<Segment>>>,
 }
 
 impl SegmentManager {
@@ -396,63 +488,129 @@ impl SegmentManager {
         Ok(())
     }
 
-    /// Obtain the target segment
-    fn get_segment(&self, segment_id: u64) -> Result<Arc<Mutex<Segment>>> {
-        let all_segments = self.all_segments.lock().unwrap();
-
-        let segment = all_segments.get(&segment_id);
-
-        let segment = match segment {
-            Some(segment) => segment,
-            None => return SegmentNotFound { id: segment_id }.fail(),
-        };
-
-        Ok(segment.clone())
-    }
-
     /// Open segment if it is not in cache, need to acquire the lock outside
-    fn open_segment(&self, segment: &mut Segment) -> Result<()> {
+    /// otherwise the segment may get closed again.
+    fn open_segment(&self, guard: &mut Segment, segment: Arc<Mutex<Segment>>) 
-> Result<()> {
+        if guard.mmap.is_some() {
+            return Ok(());
+        }
+
         let mut cache = self.cache.lock().unwrap();
 
-        // Check if segment is already in cache
-        if cache.iter().any(|id| *id == segment.id) {
+        let already_opened = cache.iter().any(|(id, _)| *id == guard.id);
+        if already_opened {
             return Ok(());
         }
 
-        // If not in cache, load from disk
-        segment.open()?;
+        guard.open()?;
 
-        // Add to cache
+        // Try evicting the oldest segment if the cache is full
         if cache.len() == self.cache_size {
-            let evicted_segment_id = cache.pop_front();
-            if let Some(evicted_segment_id) = evicted_segment_id {
+            let evicted_segment = cache.pop_front();
+            if let Some((_, evicted_segment)) = evicted_segment {
                 // The evicted segment should be closed first
-                let evicted_segment = self.get_segment(evicted_segment_id)?;
                 let mut evicted_segment = evicted_segment.lock().unwrap();
                 evicted_segment.close()?;
             }
         }
-        cache.push_back(segment.id);
+        cache.push_back((guard.id, segment.clone()));
+        Ok(())
+    }
 
+    pub fn close_all(&self) -> Result<()> {
+        {
+            let mut cache = self.cache.lock().unwrap();
+            cache.clear();
+        }
+        let all_segments = self.all_segments.lock().unwrap();
+        for segment in all_segments.values() {
+            segment.lock().unwrap().close()?;
+        }
         Ok(())
     }
 
     pub fn mark_delete_entries_up_to(
         &self,
-        _location: WalLocation,
-        _sequence_num: SequenceNumber,
+        location: WalLocation,
+        sequence_num: SequenceNumber,
     ) -> Result<()> {
-        todo!()
+        let current_segment_id = 
self.current_segment.lock().unwrap().lock().unwrap().id;
+        let mut all_segments = self.all_segments.lock().unwrap();
+        let mut segments_to_remove = Vec::new();
+
+        for (_, segment) in all_segments.iter() {
+            let mut guard = segment.lock().unwrap();
+
+            guard.mark_deleted(location.table_id, sequence_num);
+
+            // Delete this segment if it is empty and its id is less than the 
current
+            // segment
+            if guard.is_empty() && guard.id < current_segment_id {
+                let mut cache = self.cache.lock().unwrap();
+
+                // Check if segment is already in cache
+                if let Some(index) = cache.iter().position(|(id, _)| *id == 
guard.id) {
+                    cache.remove(index);
+                }
+
+                segments_to_remove.push((guard.id, segment.clone()));
+            }
+        }
+
+        // Delete segments in all_segments
+        for (segment_id, _) in segments_to_remove.iter() {
+            all_segments.remove(segment_id);
+        }
+
+        drop(all_segments);
+
+        // Delete segments on disk
+        for (_, segment) in segments_to_remove.iter() {
+            segment.lock().unwrap().delete()?;
+        }
+
+        Ok(())
     }
 
-    pub fn close_all(&self) -> Result<()> {
-        let mut cache = self.cache.lock().unwrap();
-        cache.clear();
+    pub fn get_relevant_segments(
+        &self,
+        table_id: Option<TableId>,
+        start: SequenceNumber,
+        end: SequenceNumber,
+    ) -> Result<Vec<Arc<Mutex<Segment>>>> {
+        // Find all segments that contain the requested sequence numbers
+        let mut relevant_segments = Vec::new();
+
         let all_segments = self.all_segments.lock().unwrap();
+
         for segment in all_segments.values() {
-            segment.lock().unwrap().close()?;
+            let guard = segment.lock().unwrap();
+            match table_id {
+                Some(table_id) => {
+                    if let Some(range) = guard.table_ranges.get(&table_id) {
+                        if range.0 <= end && range.1 >= start {
+                            relevant_segments.push((guard.id, 
segment.clone()));
+                        }
+                    }
+                }
+                None => {
+                    if guard.min_seq <= end && guard.max_seq >= start {
+                        relevant_segments.push((guard.id, segment.clone()));
+                    }
+                }
+            }
         }
-        Ok(())
+
+        // Sort by segment id
+        relevant_segments.sort_by_key(|(id, _)| *id);
+
+        // id is not needed, so remove it
+        let relevant_segments = relevant_segments
+            .into_iter()
+            .map(|(_, segment)| segment)
+            .collect();
+
+        Ok(relevant_segments)
     }
 }
 
@@ -482,9 +640,6 @@ pub struct Region {
 
     /// Sequence number for the next log
     next_sequence_num: AtomicU64,
-
-    /// The latest segment for appending logs
-    current_segment: Mutex<Arc<Mutex<Segment>>>,
 }
 
 impl Region {
@@ -557,6 +712,7 @@ impl Region {
             all_segments: Mutex::new(all_segments),
             cache: Mutex::new(VecDeque::new()),
             cache_size,
+            current_segment: Mutex::new(latest_segment),
         };
 
         Ok(Self {
@@ -568,17 +724,29 @@ impl Region {
             region_dir,
             next_sequence_num: AtomicU64::new(next_sequence_num),
             runtime,
-            current_segment: Mutex::new(latest_segment),
         })
     }
 
+    fn create_new_segment(&self, id: u64) -> Result<Arc<Mutex<Segment>>> {
+        // Create a new segment
+        let new_segment = Segment::new(
+            format!("{}/segment_{}.wal", self.region_dir, id),
+            id,
+            self.segment_size,
+        )?;
+        let new_segment = Arc::new(Mutex::new(new_segment));
+        self.segment_manager.add_segment(id, new_segment.clone())?;
+
+        Ok(new_segment)
+    }
+
     pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> 
Result<SequenceNumber> {
         // In the WAL based on local storage, we need to ensure the sequence 
number in
         // segment is monotonically increasing. So we need to acquire a lock 
here.
         // Perhaps we could avoid acquiring the lock here and instead allocate 
the
         // position that needs to be written in the segment, then fill it 
within
         // spawn_blocking. However, I’m not sure about the correctness of this 
approach.
-        let mut current_segment = self.current_segment.lock().unwrap();
+        let mut current_segment = 
self.segment_manager.current_segment.lock().unwrap();
 
         let entries_num = batch.len() as u64;
         let table_id = batch.location.table_id;
@@ -608,35 +776,23 @@ impl Region {
             next_sequence_num += 1;
         }
 
-        let guard = current_segment.lock().unwrap();
-
         // Check if the current segment has enough space for the new data
         // If not, create a new segment and update current_segment
-        if guard.current_size + data.len() > guard.segment_size {
-            let new_segment_id = guard.id + 1;
-            // We need to drop guard to allow the update of current_segment
-            drop(guard);
-
-            // Create a new segment
-            let new_segment = Segment::new(
-                format!("{}/segment_{}.wal", self.region_dir, new_segment_id),
-                new_segment_id,
-                self.segment_size,
-            )?;
-            let new_segment = Arc::new(Mutex::new(new_segment));
-            self.segment_manager
-                .add_segment(new_segment_id, new_segment.clone())?;
+        {
+            let guard = current_segment.lock().unwrap();
+            if guard.current_size + data.len() > guard.segment_size {
+                let new_segment_id = guard.id + 1;
+                drop(guard);
 
-            // Update current segment
-            *current_segment = new_segment;
-        } else {
-            drop(guard);
+                *current_segment = self.create_new_segment(new_segment_id)?;
+            }
         }
 
+        // Open the segment if not opened
         let mut guard = current_segment.lock().unwrap();
+        self.segment_manager
+            .open_segment(&mut guard, current_segment.clone())?;
 
-        // Open the segment if not opened
-        self.segment_manager.open_segment(&mut guard)?;
         for pos in record_position.iter_mut() {
             pos.start += guard.current_size;
             pos.end += guard.current_size;
@@ -646,8 +802,9 @@ impl Region {
         guard.append_records(
             &data,
             &mut record_position,
+            table_id,
             prev_sequence_num,
-            next_sequence_num - 1,
+            next_sequence_num,
         )?;
         Ok(next_sequence_num - 1)
     }
@@ -742,6 +899,8 @@ impl RegionManager {
         segment_size: usize,
         runtime: Arc<Runtime>,
     ) -> Result<Self> {
+        fs::create_dir_all(&root_dir).context(DirOpen)?;
+
         let mut regions = HashMap::new();
 
         // Naming conversion: <root_dir>/<region_id>
@@ -864,6 +1023,9 @@ struct SegmentLogIterator {
     /// Optional identifier for the table, which is used to filter logs.
     table_id: Option<TableId>,
 
+    /// A hashmap of start and end sequence number for each table.
+    table_ranges: HashMap<TableId, (SequenceNumber, SequenceNumber)>,
+
     /// Starting sequence number for log iteration.
     start: SequenceNumber,
 
@@ -887,17 +1049,12 @@ impl SegmentLogIterator {
         start: SequenceNumber,
         end: SequenceNumber,
     ) -> Result<Self> {
+        let mut guard = segment.lock().unwrap();
         // Open the segment if it is not open
-        let mut segment = segment.lock().unwrap();
-        if !segment.is_open() {
-            segment_manager.open_segment(&mut segment)?;
-        }
-
-        // Read the entire content of the segment
-        let segment_content = segment.read(0, segment.current_size)?;
-
-        // Get record positions
-        let record_positions = segment.record_position.clone();
+        segment_manager.open_segment(&mut guard, segment.clone())?;
+        let segment_content = guard.read(0, guard.current_size)?;
+        let record_positions = guard.record_position.clone();
+        let table_ranges = guard.table_ranges.clone();
 
         Ok(Self {
             log_encoding,
@@ -905,6 +1062,7 @@ impl SegmentLogIterator {
             segment_content,
             record_positions,
             table_id,
+            table_ranges,
             start,
             end,
             current_record_idx: 0,
@@ -952,6 +1110,15 @@ impl SegmentLogIterator {
                 }
             }
 
+            // Filter by sequence range
+            if let Some((start, end)) = 
&self.table_ranges.get(&record.table_id) {
+                if record.sequence_num < *start || record.sequence_num > *end {
+                    continue;
+                }
+            } else {
+                continue;
+            }
+
             // Decode the value
             let value = self
                 .log_encoding
@@ -975,7 +1142,7 @@ pub struct MultiSegmentLogIterator {
     segment_manager: Arc<SegmentManager>,
 
     /// All segments involved in this read operation.
-    segments: Vec<u64>,
+    segments: Vec<Arc<Mutex<Segment>>>,
 
     /// Current segment index.
     current_segment_idx: usize,
@@ -1011,22 +1178,7 @@ impl MultiSegmentLogIterator {
         start: SequenceNumber,
         end: SequenceNumber,
     ) -> Result<Self> {
-        // Find all segments that contain the requested sequence numbers
-        let mut relevant_segments = Vec::new();
-
-        {
-            let all_segments = segment_manager.all_segments.lock().unwrap();
-
-            for (_, segment) in all_segments.iter() {
-                let segment = segment.lock().unwrap();
-                if segment.min_seq <= end && segment.max_seq >= start {
-                    relevant_segments.push(segment.id);
-                }
-            }
-        }
-
-        // Sort by segment id
-        relevant_segments.sort_unstable();
+        let relevant_segments = 
segment_manager.get_relevant_segments(table_id, start, end)?;
 
         let mut iter = Self {
             segment_manager,
@@ -1053,8 +1205,7 @@ impl MultiSegmentLogIterator {
             return Ok(false);
         }
 
-        let segment = self.segments[self.current_segment_idx];
-        let segment = self.segment_manager.get_segment(segment)?;
+        let segment = self.segments[self.current_segment_idx].clone();
         let iterator = SegmentLogIterator::new(
             self.log_encoding.clone(),
             self.record_encoding.clone(),
@@ -1129,7 +1280,7 @@ mod tests {
         assert_eq!(segment.version, NEWEST_WAL_SEGMENT_VERSION);
         assert_eq!(segment.path, path);
         assert_eq!(segment.id, 0);
-        assert_eq!(segment.current_size, SEGMENT_HEADER.len());
+        assert_eq!(segment.current_size, SEGMENT_HEADER.len() + VERSION_SIZE);
 
         let segment_content = fs::read(path).unwrap();
         assert_eq!(segment_content[0], NEWEST_WAL_SEGMENT_VERSION);
@@ -1153,6 +1304,10 @@ mod tests {
         segment
             .open()
             .expect("Expected to open segment successfully");
+
+        segment
+            .restore_meta()
+            .expect("Expected to restore meta successfully");
     }
 
     #[test]
@@ -1166,6 +1321,7 @@ mod tests {
             .to_string();
         let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap();
         segment.open().unwrap();
+        segment.restore_meta().unwrap();
 
         let data = b"test_data";
         let append_result = segment.append(data);
@@ -1176,6 +1332,26 @@ mod tests {
         assert_eq!(read_result.unwrap(), data);
     }
 
+    #[test]
+    fn test_segment_delete() {
+        let dir = tempdir().unwrap();
+
+        let path = dir
+            .path()
+            .join("segment_0.wal")
+            .to_str()
+            .unwrap()
+            .to_string();
+
+        let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap();
+
+        segment.open().unwrap();
+        assert!(Path::new(&path).exists());
+
+        segment.delete().unwrap();
+        assert!(!Path::new(&path).exists());
+    }
+
     #[test]
     fn test_region_create_and_close() {
         let dir = tempdir().unwrap();
@@ -1190,8 +1366,6 @@ mod tests {
         )
         .unwrap();
 
-        let _segment = region.segment_manager.get_segment(0).unwrap();
-
         region.close().unwrap()
     }
 
@@ -1300,4 +1474,70 @@ mod tests {
         let runtime = Arc::new(Builder::default().build().unwrap());
         
runtime.block_on(test_multi_segment_write_and_read_inner(runtime.clone()));
     }
+
+    #[test]
+    fn test_region_mark_delete_entries_up_to() {
+        const SEGMENT_SIZE: usize = 4096;
+
+        let dir = tempdir().unwrap();
+        let runtime = Arc::new(Builder::default().build().unwrap());
+
+        // Create a new region
+        let region = Region::new(
+            1,
+            2,
+            SEGMENT_SIZE,
+            dir.path().to_str().unwrap().to_string(),
+            runtime.clone(),
+        )
+        .unwrap();
+        let region = Arc::new(region);
+
+        // Write some log entries
+        let location = WalLocation::new(1, 1); // region_id = 1, table_id = 1
+        let mut sequence = MIN_SEQUENCE_NUMBER + 1;
+
+        for _i in 0..10 {
+            let log_entries = 0..100;
+            let log_batch_encoder = LogBatchEncoder::create(location);
+            let log_batch = log_batch_encoder
+                .encode_batch(log_entries.clone().map(|v| MemoryPayload { val: 
v }))
+                .expect("should succeed to encode payloads");
+
+            let write_ctx = WriteContext::default();
+            let actual_sequence = region.write(&write_ctx, 
&log_batch).unwrap();
+            assert_eq!(actual_sequence, sequence + 100 - 1);
+            sequence += 100;
+        }
+
+        let latest_segment_id = {
+            let all_segments = 
region.segment_manager.all_segments.lock().unwrap();
+            // Expect more than one segment
+            assert!(
+                all_segments.len() > 1,
+                "Expected multiple segments, but got {}",
+                all_segments.len()
+            );
+            all_segments.keys().max().unwrap().to_owned()
+        };
+
+        // Mark delete entries up to sequence - 1, so only the last segment 
should
+        // remain
+        let mark_delete_sequence = sequence - 1;
+        region
+            .mark_delete_entries_up_to(location, mark_delete_sequence)
+            .unwrap();
+
+        {
+            let all_segments = 
region.segment_manager.all_segments.lock().unwrap();
+            assert_eq!(all_segments.len(), 1);
+            assert!(all_segments.contains_key(&latest_segment_id));
+        }
+
+        // The num of segment in the dir should be 1
+        let segment_count = fs::read_dir(dir.path()).unwrap().count();
+        assert_eq!(segment_count, 1);
+
+        region.close().unwrap();
+    }
 }
diff --git a/src/wal/src/local_storage_impl/wal_manager.rs 
b/src/wal/src/local_storage_impl/wal_manager.rs
index 91c69fcd..694831ea 100644
--- a/src/wal/src/local_storage_impl/wal_manager.rs
+++ b/src/wal/src/local_storage_impl/wal_manager.rs
@@ -54,14 +54,14 @@ impl LocalStorageImpl {
     ) -> Result<Self> {
         let LocalStorageConfig {
             cache_size,
-            max_segment_size,
+            segment_size,
             ..
         } = config.clone();
         let wal_path_str = wal_path.to_str().unwrap().to_string();
         let region_manager = RegionManager::new(
             wal_path_str.clone(),
             cache_size,
-            max_segment_size,
+            segment_size,
             runtime.clone(),
         )
         .box_err()
@@ -104,6 +104,10 @@ impl WalManager for LocalStorageImpl {
         location: WalLocation,
         sequence_num: SequenceNumber,
     ) -> Result<()> {
+        debug!(
+            "Mark delete entries up to {} for location:{:?}",
+            sequence_num, location
+        );
         self.region_manager
             .mark_delete_entries_up_to(location, sequence_num)
             .box_err()
@@ -111,10 +115,7 @@ impl WalManager for LocalStorageImpl {
     }
 
     async fn close_region(&self, region_id: RegionId) -> Result<()> {
-        debug!(
-            "Close region for LocalStorage based WAL is noop operation, 
region_id:{}",
-            region_id
-        );
+        debug!("Close region {} for LocalStorage based WAL", region_id);
         self.region_manager
             .close(region_id)
             .box_err()
@@ -133,10 +134,15 @@ impl WalManager for LocalStorageImpl {
         ctx: &ReadContext,
         req: &ReadRequest,
     ) -> Result<BatchLogIteratorAdapter> {
+        debug!(
+            "Read batch from LocalStorage based WAL, ctx:{:?}, req:{:?}",
+            ctx, req
+        );
         self.region_manager.read(ctx, req).box_err().context(Read)
     }
 
     async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> 
Result<SequenceNumber> {
+        debug!("Write batch to LocalStorage based WAL, ctx:{:?}", ctx);
         self.region_manager
             .write(ctx, batch)
             .box_err()
@@ -144,6 +150,10 @@ impl WalManager for LocalStorageImpl {
     }
 
     async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> 
Result<BatchLogIteratorAdapter> {
+        debug!(
+            "Scan from LocalStorage based WAL, ctx:{:?}, req:{:?}",
+            ctx, req
+        );
         self.region_manager.scan(ctx, req).box_err().context(Read)
     }
 
@@ -181,7 +191,7 @@ impl WalsOpener for LocalStorageWalsOpener {
         };
 
         let write_runtime = runtimes.write_runtime.clone();
-        let data_path = Path::new(&local_storage_wal_config.path);
+        let data_path = Path::new(&local_storage_wal_config.data_dir);
 
         let data_wal = if config.disable_data {
             Arc::new(crate::dummy::DoNothing)
diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs
index cccde53c..24c4c75f 100644
--- a/src/wal/tests/read_write.rs
+++ b/src/wal/tests/read_write.rs
@@ -72,7 +72,6 @@ fn test_kafka_wal() {
 }
 
 #[test]
-#[ignore = "this test cannot pass completely, since delete is not supported 
yet"]
 fn test_local_storage_wal() {
     let builder = LocalStorageWalBuilder;
     test_all(builder, false);
@@ -997,7 +996,7 @@ impl WalBuilder for LocalStorageWalBuilder {
 
     async fn build(&self, data_path: &Path, runtime: Arc<Runtime>) -> 
Arc<Self::Wal> {
         let config = LocalStorageConfig {
-            path: data_path.to_str().unwrap().to_string(),
+            data_dir: data_path.to_str().unwrap().to_string(),
             ..LocalStorageConfig::default()
         };
         Arc::new(LocalStorageImpl::new(data_path.to_path_buf(), config, 
runtime).unwrap())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to