jiacai2050 commented on code in PR #1556:
URL: https://github.com/apache/horaedb/pull/1556#discussion_r1740281726


##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -314,50 +348,53 @@ impl Segment {
     }
 }
 
-pub struct Region {
-    /// Identifier for regions.
-    _region_id: u64,
-
+#[derive(Debug)]
+pub struct SegmentManager {
     /// All segments protected by a mutex
-    /// todo: maybe use a RWLock?
-    all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
+    all_segments: Mutex<HashMap<u64, Arc<RwLock<Segment>>>>,
 
     /// Cache for opened segments
     cache: Mutex<VecDeque<u64>>,
 
     /// Maximum size of the cache
     cache_size: usize,
 
+    /// Maximum size of a segment file
+    max_file_size: u64,

Review Comment:
   ```
   // All segments are fixed size
   segment_size, 
   ```



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -502,14 +542,38 @@ impl Region {
             next_sequence_num += 1;
         }
 
-        // TODO: spawn a new task to write to segment
-        // TODO: maybe need a write mutex?
+        {
+            let segment = self.get_segment(*current)?;
+            let segment = segment.read().unwrap();
+
+            // Check if the current segment has enough space for the new data
+            // If not, create a new segment and update the current segment ID
+            if segment.size + data.len() as u64 > segment.max_file_size {
+                let new_segment_id = *current + 1;
+                let new_segment = Segment::new(
+                    format!("{}/segment_{}.wal", self._segment_dir, 
new_segment_id),
+                    new_segment_id,
+                    self.max_file_size,
+                )?;
+                let new_segment = Arc::new(RwLock::new(new_segment));
+
+                let mut all_segments = self.all_segments.lock().unwrap();
+                all_segments.insert(new_segment_id, new_segment.clone());
+                *current = new_segment_id;
+            }
+        }
+
+        let segment = self.get_segment(*current)?;
+        let mut segment = segment.write().unwrap();

Review Comment:
   When you acquire write lock here, the condition checked above may not held 
anymore. you have to check it again.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -502,14 +542,38 @@ impl Region {
             next_sequence_num += 1;
         }
 
-        // TODO: spawn a new task to write to segment
-        // TODO: maybe need a write mutex?
+        {
+            let segment = self.get_segment(*current)?;

Review Comment:
   In this block, current's lock is still held, so there is no race condition 
here?



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -518,9 +582,87 @@ impl Region {
 
         // Append logs to segment file
         segment.append(&data)?;
+

Review Comment:
   We need to write data first, then update position/seq.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -502,14 +542,38 @@ impl Region {
             next_sequence_num += 1;
         }
 
-        // TODO: spawn a new task to write to segment
-        // TODO: maybe need a write mutex?
+        {
+            let segment = self.get_segment(*current)?;
+            let segment = segment.read().unwrap();
+
+            // Check if the current segment has enough space for the new data
+            // If not, create a new segment and update the current segment ID
+            if segment.size + data.len() as u64 > segment.max_file_size {
+                let new_segment_id = *current + 1;
+                let new_segment = Segment::new(
+                    format!("{}/segment_{}.wal", self._segment_dir, 
new_segment_id),
+                    new_segment_id,
+                    self.max_file_size,
+                )?;
+                let new_segment = Arc::new(RwLock::new(new_segment));
+
+                let mut all_segments = self.all_segments.lock().unwrap();
+                all_segments.insert(new_segment_id, new_segment.clone());
+                *current = new_segment_id;
+            }
+        }
+
+        let segment = self.get_segment(*current)?;
+        let mut segment = segment.write().unwrap();
 
         for pos in record_position.iter_mut() {
             pos.start += segment.size;
             pos.end += segment.size;
         }
 
+        // TODO: spawn a new task to write to segment

Review Comment:
   Do you worry about this write process will block the whole WAL write?
   
   If it's, we usually don't flush for every write, so the write goes to 
kernel's page cache only, so the perf is acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to