This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 58db4f5f refactor: use `Read` trait to convert Bytes to Snapshot 
(#1612)
58db4f5f is described below

commit 58db4f5f99da6af0f184e47a590b7de44ff7d5e9
Author: Jiacai Liu <d...@liujiacai.net>
AuthorDate: Wed Dec 18 11:55:24 2024 +0800

    refactor: use `Read` trait to convert Bytes to Snapshot (#1612)
    
    ## Rationale
    
    
    ## Detailed Changes
    
    
    ## Test Plan
    CI
---
 .github/workflows/ci.yml                   |   6 --
 src/metric_engine/src/manifest/encoding.rs | 148 +++++++++++------------------
 2 files changed, 58 insertions(+), 96 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 73775bc6..ea01d87e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -57,9 +57,6 @@ jobs:
       - uses: actions/checkout@v4
         with:
           submodules: true
-      - name: Release Disk Quota
-        run: |
-          sudo make ensure-disk-quota
       - name: Setup Build Environment
         run: |
           sudo apt update
@@ -81,9 +78,6 @@ jobs:
     timeout-minutes: 60
     steps:
       - uses: actions/checkout@v4
-      - name: Release Disk Quota
-        run: |
-          sudo make ensure-disk-quota
       - name: Setup Build Environment
         run: |
           sudo apt update
diff --git a/src/metric_engine/src/manifest/encoding.rs 
b/src/metric_engine/src/manifest/encoding.rs
index 72d43d6e..9ec0ecd6 100644
--- a/src/metric_engine/src/manifest/encoding.rs
+++ b/src/metric_engine/src/manifest/encoding.rs
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::{Cursor, Write};
+use std::io::{Cursor, Read, Write};
 
 use anyhow::Context;
 use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
-use bytes::Bytes;
-use parquet::data_type::AsBytes;
+use bytes::{Buf, Bytes};
 
 use crate::{
     ensure,
@@ -88,7 +87,7 @@ impl From<ManifestUpdate> for pb_types::ManifestUpdate {
 /// - The length field (u64) represents the total length of the subsequent
 ///   records and serves as a straightforward method for verifying their
 ///   integrity. (length = record_length * record_count)
-#[derive(Debug)]
+#[derive(Debug, PartialEq, Eq)]
 pub struct SnapshotHeader {
     pub magic: u32,
     pub version: u8,
@@ -96,27 +95,33 @@ pub struct SnapshotHeader {
     pub length: u64,
 }
 
-impl TryFrom<&[u8]> for SnapshotHeader {
-    type Error = Error;
+impl SnapshotHeader {
+    pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 
/*length*/;
+    pub const MAGIC: u32 = 0xCAFE_1234;
 
-    fn try_from(bytes: &[u8]) -> Result<Self> {
-        ensure!(
-            bytes.len() >= Self::LENGTH,
-            "invalid bytes, length: {}",
-            bytes.len()
-        );
+    pub fn new(length: u64) -> Self {
+        Self {
+            magic: SnapshotHeader::MAGIC,
+            version: SnapshotRecord::VERSION,
+            flag: 0,
+            length,
+        }
+    }
 
-        let mut cursor = Cursor::new(bytes);
-        let magic = cursor
+    pub fn try_new<R>(mut reader: R) -> Result<Self>
+    where
+        R: Read,
+    {
+        let magic = reader
             .read_u32::<LittleEndian>()
             .context("read snapshot header magic")?;
         ensure!(
             magic == SnapshotHeader::MAGIC,
             "invalid bytes to convert to header."
         );
-        let version = cursor.read_u8().context("read snapshot header 
version")?;
-        let flag = cursor.read_u8().context("read snapshot header flag")?;
-        let length = cursor
+        let version = reader.read_u8().context("read snapshot header 
version")?;
+        let flag = reader.read_u8().context("read snapshot header flag")?;
+        let length = reader
             .read_u64::<LittleEndian>()
             .context("read snapshot header length")?;
         Ok(Self {
@@ -126,20 +131,6 @@ impl TryFrom<&[u8]> for SnapshotHeader {
             length,
         })
     }
-}
-
-impl SnapshotHeader {
-    pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 
/*length*/;
-    pub const MAGIC: u32 = 0xCAFE_1234;
-
-    pub fn new(length: u64) -> Self {
-        Self {
-            magic: SnapshotHeader::MAGIC,
-            version: SnapshotRecord::VERSION,
-            flag: 0,
-            length,
-        }
-    }
 
     pub fn write_to<W>(&self, mut writer: W) -> Result<()>
     where
@@ -167,6 +158,7 @@ impl SnapshotHeader {
 /// | id(u64) | time_range(i64*2) | size(u32)  |  num_rows(u32)  |
 /// +---------+-------------------+------------+-----------------+
 /// ```
+#[derive(Debug, PartialEq, Eq)]
 pub struct SnapshotRecord {
     id: u64,
     time_range: TimeRange,
@@ -216,30 +208,24 @@ impl From<SstFile> for SnapshotRecord {
     }
 }
 
-impl TryFrom<&[u8]> for SnapshotRecord {
-    type Error = Error;
-
-    fn try_from(value: &[u8]) -> Result<Self> {
-        ensure!(
-            value.len() >= SnapshotRecord::LENGTH,
-            "invalid value len: {}",
-            value.len()
-        );
-
-        let mut cursor = Cursor::new(value);
-        let id = cursor
+impl SnapshotRecord {
+    fn try_new<R>(mut reader: R) -> Result<Self>
+    where
+        R: Read,
+    {
+        let id = reader
             .read_u64::<LittleEndian>()
             .context("read record id")?;
-        let start = cursor
+        let start = reader
             .read_i64::<LittleEndian>()
             .context("read record start")?;
-        let end = cursor
+        let end = reader
             .read_i64::<LittleEndian>()
             .context("read record end")?;
-        let size = cursor
+        let size = reader
             .read_u32::<LittleEndian>()
             .context("read record size")?;
-        let num_rows = cursor
+        let num_rows = reader
             .read_u32::<LittleEndian>()
             .context("read record num_rows")?;
         Ok(SnapshotRecord {
@@ -286,21 +272,20 @@ impl TryFrom<Bytes> for Snapshot {
         if bytes.is_empty() {
             return Ok(Snapshot::default());
         }
-        let header = SnapshotHeader::try_from(bytes.as_bytes())?;
+        let bytes_len = bytes.len();
+        let mut cursor = Cursor::new(bytes);
+        let header = SnapshotHeader::try_new(&mut cursor)?;
         let record_total_length = header.length as usize;
         ensure!(
             record_total_length > 0
                 && record_total_length % SnapshotRecord::LENGTH == 0
-                && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
-            "create snapshot from bytes failed, header:{header:?}, 
bytes_length: {}",
-            bytes.len()
+                && record_total_length + SnapshotHeader::LENGTH == bytes_len,
+            "create snapshot from bytes failed, header:{header:?}, 
bytes_length: {bytes_len}",
         );
-        let mut index = SnapshotHeader::LENGTH;
         let mut records = Vec::with_capacity(record_total_length / 
SnapshotRecord::LENGTH);
-        while index < bytes.len() {
-            let record = SnapshotRecord::try_from(&bytes[index..index + 
SnapshotRecord::LENGTH])?;
+        while cursor.has_remaining() {
+            let record = SnapshotRecord::try_new(&mut cursor)?;
             records.push(record);
-            index += SnapshotRecord::LENGTH;
         }
 
         Ok(Self { header, records })
@@ -351,23 +336,17 @@ mod tests {
         let mut writer = vec.as_mut_slice();
         header.write_to(&mut writer).unwrap();
         assert!(writer.is_empty());
-        let mut cursor = Cursor::new(vec);
+        let cursor = Cursor::new(vec);
+        let header = SnapshotHeader::try_new(cursor).unwrap();
 
         assert_eq!(
-            SnapshotHeader::MAGIC,
-            cursor.read_u32::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            1, // version
-            cursor.read_u8().unwrap()
-        );
-        assert_eq!(
-            0, // flag
-            cursor.read_u8().unwrap()
-        );
-        assert_eq!(
-            257, // length
-            cursor.read_u64::<LittleEndian>().unwrap()
+            SnapshotHeader {
+                magic: SnapshotHeader::MAGIC,
+                version: 1,
+                flag: 0,
+                length: 257
+            },
+            header
         );
     }
 
@@ -388,27 +367,16 @@ mod tests {
         record.write_to(&mut writer).unwrap();
 
         assert!(writer.is_empty());
-        let mut cursor = Cursor::new(vec);
-
-        assert_eq!(
-            99, // id
-            cursor.read_u64::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            100, // start range
-            cursor.read_i64::<LittleEndian>().unwrap()
-        );
+        let cursor = Cursor::new(vec);
+        let record = SnapshotRecord::try_new(cursor).unwrap();
         assert_eq!(
-            200, // end range
-            cursor.read_i64::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            938, // size
-            cursor.read_u32::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            100, // num rows
-            cursor.read_u32::<LittleEndian>().unwrap()
+            SnapshotRecord {
+                id: 99,
+                time_range: (100..200).into(),
+                size: 938,
+                num_rows: 100
+            },
+            record
         );
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@horaedb.apache.org
For additional commands, e-mail: commits-h...@horaedb.apache.org

Reply via email to