jiacai2050 commented on code in PR #1566:
URL: https://github.com/apache/horaedb/pull/1566#discussion_r1756527410


##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -454,6 +516,89 @@ impl SegmentManager {
         }
         Ok(())
     }
+
+    pub fn mark_delete_entries_up_to(
+        &self,
+        location: WalLocation,
+        sequence_num: SequenceNumber,
+    ) -> Result<()> {
+        let current_segment_id = 
self.current_segment.lock().unwrap().lock().unwrap().id;
+        let mut all_segments = self.all_segments.lock().unwrap();
+        let mut segments_to_remove = Vec::new();
+
+        for (_, segment) in all_segments.iter() {
+            let mut guard = segment.lock().unwrap();
+
+            guard.mark_deleted(location.table_id, sequence_num);
+
+            // Delete this segment if it is empty
+            if guard.is_empty() && guard.id != current_segment_id {
+                let mut cache = self.cache.lock().unwrap();
+
+                // Check if segment is already in cache
+                if let Some(index) = cache.iter().position(|(id, _)| *id == 
guard.id) {
+                    cache.remove(index);
+                }
+
+                segments_to_remove.push((guard.id, segment.clone()));
+            }
+        }
+
+        // Delete segments in all_segments
+        for (segment_id, _) in segments_to_remove.iter() {
+            all_segments.remove(segment_id);
+        }
+
+        drop(all_segments);
+
+        // Delete segments on disk
+        for (_, segment) in segments_to_remove.iter() {

Review Comment:
   values()



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -1300,4 +1464,63 @@ mod tests {
         let runtime = Arc::new(Builder::default().build().unwrap());
         
runtime.block_on(test_multi_segment_write_and_read_inner(runtime.clone()));
     }
+
+    #[test]
+    fn test_region_mark_delete_entries_up_to() {
+        const SEGMENT_SIZE: usize = 4096;
+
+        let dir = tempdir().unwrap();
+        let runtime = Arc::new(Builder::default().build().unwrap());
+
+        // Create a new region
+        let region = Region::new(
+            1,
+            2,
+            SEGMENT_SIZE,
+            dir.path().to_str().unwrap().to_string(),
+            runtime.clone(),
+        )
+        .unwrap();
+        let region = Arc::new(region);
+
+        // Write some log entries
+        let location = WalLocation::new(1, 1); // region_id = 1, table_id = 1
+        let mut sequence = MIN_SEQUENCE_NUMBER + 1;
+
+        for _i in 0..10 {
+            let log_entries = 0..100;
+            let log_batch_encoder = LogBatchEncoder::create(location);
+            let log_batch = log_batch_encoder
+                .encode_batch(log_entries.clone().map(|v| MemoryPayload { val: 
v }))
+                .expect("should succeed to encode payloads");
+
+            let write_ctx = WriteContext::default();
+            region.write(&write_ctx, &log_batch).unwrap();

Review Comment:
   We need to check the returned seq is expected.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -454,6 +516,89 @@ impl SegmentManager {
         }
         Ok(())
     }
+
+    pub fn mark_delete_entries_up_to(
+        &self,
+        location: WalLocation,
+        sequence_num: SequenceNumber,
+    ) -> Result<()> {
+        let current_segment_id = 
self.current_segment.lock().unwrap().lock().unwrap().id;
+        let mut all_segments = self.all_segments.lock().unwrap();
+        let mut segments_to_remove = Vec::new();
+
+        for (_, segment) in all_segments.iter() {
+            let mut guard = segment.lock().unwrap();
+
+            guard.mark_deleted(location.table_id, sequence_num);
+
+            // Delete this segment if it is empty
+            if guard.is_empty() && guard.id != current_segment_id {

Review Comment:
   Since current segment may changed, the check here should be `guard.id < 
current_segment_id`.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -1300,4 +1464,63 @@ mod tests {
         let runtime = Arc::new(Builder::default().build().unwrap());
         
runtime.block_on(test_multi_segment_write_and_read_inner(runtime.clone()));
     }
+
+    #[test]
+    fn test_region_mark_delete_entries_up_to() {
+        const SEGMENT_SIZE: usize = 4096;
+
+        let dir = tempdir().unwrap();
+        let runtime = Arc::new(Builder::default().build().unwrap());
+
+        // Create a new region
+        let region = Region::new(
+            1,
+            2,
+            SEGMENT_SIZE,
+            dir.path().to_str().unwrap().to_string(),
+            runtime.clone(),
+        )
+        .unwrap();
+        let region = Arc::new(region);
+
+        // Write some log entries
+        let location = WalLocation::new(1, 1); // region_id = 1, table_id = 1
+        let mut sequence = MIN_SEQUENCE_NUMBER + 1;
+
+        for _i in 0..10 {
+            let log_entries = 0..100;
+            let log_batch_encoder = LogBatchEncoder::create(location);
+            let log_batch = log_batch_encoder
+                .encode_batch(log_entries.clone().map(|v| MemoryPayload { val: 
v }))
+                .expect("should succeed to encode payloads");
+
+            let write_ctx = WriteContext::default();
+            region.write(&write_ctx, &log_batch).unwrap();
+            sequence += 100;
+        }
+
+        // Expect more than one segment
+        {
+            let all_segments = 
region.segment_manager.all_segments.lock().unwrap();
+            assert!(
+                all_segments.len() > 1,
+                "Expected multiple segments, but got {}",
+                all_segments.len()
+            );
+        }
+
+        // Mark delete entries up to sequence - 1, so only the last segment 
should
+        // remain
+        let mark_delete_sequence = sequence - 1;
+        region
+            .mark_delete_entries_up_to(location, mark_delete_sequence)
+            .unwrap();
+
+        {
+            let all_segments = 
region.segment_manager.all_segments.lock().unwrap();
+            assert_eq!(all_segments.len(), 1);

Review Comment:
   We also need to check file inside wal directory is 1.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -454,6 +516,89 @@ impl SegmentManager {
         }
         Ok(())
     }
+
+    pub fn mark_delete_entries_up_to(
+        &self,
+        location: WalLocation,
+        sequence_num: SequenceNumber,
+    ) -> Result<()> {
+        let current_segment_id = 
self.current_segment.lock().unwrap().lock().unwrap().id;
+        let mut all_segments = self.all_segments.lock().unwrap();
+        let mut segments_to_remove = Vec::new();
+
+        for (_, segment) in all_segments.iter() {
+            let mut guard = segment.lock().unwrap();
+
+            guard.mark_deleted(location.table_id, sequence_num);
+
+            // Delete this segment if it is empty
+            if guard.is_empty() && guard.id != current_segment_id {
+                let mut cache = self.cache.lock().unwrap();
+
+                // Check if segment is already in cache
+                if let Some(index) = cache.iter().position(|(id, _)| *id == 
guard.id) {
+                    cache.remove(index);
+                }
+
+                segments_to_remove.push((guard.id, segment.clone()));
+            }
+        }
+
+        // Delete segments in all_segments
+        for (segment_id, _) in segments_to_remove.iter() {

Review Comment:
   keys()



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -383,10 +465,13 @@ pub struct SegmentManager {
     all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
 
     /// Cache for opened segments
-    cache: Mutex<VecDeque<u64>>,
+    cache: Mutex<VecDeque<(u64, Arc<Mutex<Segment>>)>>,

Review Comment:
   Why add seq here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to