This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 25dbe738 feat: another format for manifest (#1604)
25dbe738 is described below

commit 25dbe738f384d36fe3a89a5f0f278b6dd106f144
Author: MianChen <[email protected]>
AuthorDate: Thu Dec 12 09:12:35 2024 -0600

    feat: another format for manifest (#1604)
    
    ## Rationale
    Close #1600
    
    ## Detailed Changes
    Creating a new data format to represent the manifest snapshot file.
    ```text
    | magic(u32) | version(u8) |  flags(u8) | length(u64) | Record(N) ... |
    
    The Magic field (u32) is used to ensure the validity of the data source.
    The Flags field (u8) is reserved for future extensibility, such as enabling 
compression or supporting additional features.
    The length field (u64) represents the total length of the subsequent 
records and serves as a straightforward method for verifying their integrity. 
(length = record_length * record_count)
    
    # Record is a self-descriptive message
    | id(u64) | time_range(i64*2)| size(u32) |  num_rows(u32)|
    ```
    
    In do_merge, the snapshot data handle is like:
    
    ```text
    Old data flow in do_merge:
                                          delta_sstmetas
                                                 | (extend vec)
                                                 V
    object_store -> org_bytes -> org_pb -> Vec<sstmeta> -> dst_pb -> dst_bytes 
-> object_store
    
    New data flow in do_merge:
                   delta_sstmetas -> bytes
                                      | (append)
                                      V
    object_store -> org_bytes -> dst_bytes -> object_store
    ````
    
    Specifically, I create the SnapshotHeader and SnapshotRecordV1 to
    represent the corresponding data in snapshot bytes. Before merging delta
    sstfiles into new bytes, we allocate a larger Vec `<u8>` and copy each
    segment (header, old records, new records) into it.
    
    This RP DOES NOT address format upgrade logic which can be resolved in a
    separate PR. As for the upgrade, we could define a new SnapshotRecord
    format and perform data migration in Manifest::try_new.
    
    
    ## Test Plan
    UT
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 horaedb/Cargo.lock                    |   1 +
 horaedb/Cargo.toml                    |   1 +
 horaedb/metric_engine/Cargo.toml      |   1 +
 horaedb/metric_engine/src/manifest.rs | 454 ++++++++++++++++++++++++++++++++--
 4 files changed, 435 insertions(+), 22 deletions(-)

diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 581e0f75..6bb4454e 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -1665,6 +1665,7 @@ dependencies = [
  "arrow-schema",
  "async-scoped",
  "async-trait",
+ "byteorder",
  "bytes",
  "bytesize",
  "datafusion",
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index 6335c512..0bc58ea0 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -30,6 +30,7 @@ anyhow = { version = "1.0" }
 metric_engine = { path = "metric_engine" }
 thiserror = "1"
 bytes = "1"
+byteorder = "1"
 datafusion = "43"
 parquet = { version = "53" }
 object_store = { version = "0.11" }
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index 3be3cdde..0d42c058 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -38,6 +38,7 @@ async-scoped = { workspace = true }
 async-trait = { workspace = true }
 bytes = { workspace = true }
 bytesize = { workspace = true }
+byteorder = { workspace = true }
 datafusion = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
diff --git a/horaedb/metric_engine/src/manifest.rs 
b/horaedb/metric_engine/src/manifest.rs
index 37646348..3277558b 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -17,6 +17,7 @@
 
 use std::{
     collections::HashSet,
+    io::{Cursor, Write},
     sync::{
         atomic::{AtomicUsize, Ordering},
         Arc,
@@ -26,9 +27,12 @@ use std::{
 
 use anyhow::Context;
 use async_scoped::TokioScope;
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
 use bytes::Bytes;
 use futures::{StreamExt, TryStreamExt};
+use macros::ensure;
 use object_store::{path::Path, PutPayload};
+use parquet::data_type::AsBytes;
 use prost::Message;
 use tokio::{
     runtime::Runtime,
@@ -59,6 +63,7 @@ pub struct Manifest {
     payload: RwLock<Payload>,
 }
 
+#[derive(Default)]
 pub struct Payload {
     files: Vec<SstFile>,
 }
@@ -72,6 +77,20 @@ impl Payload {
     }
 }
 
+impl TryFrom<Bytes> for Payload {
+    type Error = Error;
+
+    fn try_from(value: Bytes) -> Result<Self> {
+        if value.is_empty() {
+            Ok(Self::default())
+        } else {
+            let snapshot = Snapshot::try_from(value)?;
+            let files = snapshot.to_sstfiles()?;
+            Ok(Self { files })
+        }
+    }
+}
+
 impl TryFrom<pb_types::Manifest> for Payload {
     type Error = Error;
 
@@ -125,7 +144,9 @@ impl Manifest {
             });
         }
 
-        let payload = read_snapshot(&store, &snapshot_path).await?;
+        let bytes = read_object(&store, &snapshot_path).await?;
+        // TODO: add upgrade logic
+        let payload = bytes.try_into()?;
 
         Ok(Self {
             delta_dir,
@@ -184,6 +205,287 @@ impl Manifest {
     }
 }
 
+/// The layout for the header.
+/// ```plaintext
+/// +-------------+--------------+------------+--------------+
+/// | magic(u32)  | version(u8)  | flag(u8)   | length(u32)  |
+/// +-------------+--------------+------------+--------------+
+/// ```
+/// - The Magic field (u32) is used to ensure the validity of the data source.
+/// - The Flags field (u8) is reserved for future extensibility, such as
+///   enabling compression or supporting additional features.
+/// - The length field (u64) represents the total length of the subsequent
+///   records and serves as a straightforward method for verifying their
+///   integrity. (length = record_length * record_count)
+struct SnapshotHeader {
+    pub magic: u32,
+    pub version: u8,
+    pub flag: u8,
+    pub length: u64,
+}
+
+impl TryFrom<&[u8]> for SnapshotHeader {
+    type Error = Error;
+
+    fn try_from(bytes: &[u8]) -> Result<Self> {
+        ensure!(
+            bytes.len() >= Self::LENGTH,
+            "invalid bytes, length: {}",
+            bytes.len()
+        );
+
+        let mut cursor = Cursor::new(bytes);
+        let magic = cursor.read_u32::<LittleEndian>().unwrap();
+        ensure!(
+            magic == SnapshotHeader::MAGIC,
+            "invalid bytes to convert to header."
+        );
+        let version = cursor.read_u8().unwrap();
+        let flag = cursor.read_u8().unwrap();
+        let length = cursor.read_u64::<LittleEndian>().unwrap();
+        Ok(Self {
+            magic,
+            version,
+            flag,
+            length,
+        })
+    }
+}
+
+impl SnapshotHeader {
+    pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 
/*length*/;
+    pub const MAGIC: u32 = 0xCAFE_6666;
+
+    pub fn new(length: u64) -> Self {
+        Self {
+            magic: SnapshotHeader::MAGIC,
+            version: SnapshotRecordV1::VERSION,
+            flag: 0,
+            length,
+        }
+    }
+
+    pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> {
+        ensure!(
+            writer.len() >= SnapshotHeader::LENGTH,
+            "writer buf is too small for writing the header, length: {}",
+            writer.len()
+        );
+
+        writer
+            .write_u32::<LittleEndian>(self.magic)
+            .context("write shall not fail.")?;
+        writer
+            .write_u8(self.version)
+            .context("write shall not fail.")?;
+        writer
+            .write_u8(self.flag)
+            .context("write shall not fail.")?;
+        writer
+            .write_u64::<LittleEndian>(self.length)
+            .context("write shall not fail.")?;
+        Ok(())
+    }
+}
+
+/// The layout for manifest Record:
+/// ```plaintext
+/// +---------+-------------------+------------+-----------------+
+/// | id(u64) | time_range(i64*2) | size(u32)  |  num_rows(u32)  |
+/// +---------+-------------------+------------+-----------------+
+/// ```
+struct SnapshotRecordV1 {
+    id: u64,
+    time_range: TimeRange,
+    size: u32,
+    num_rows: u32,
+}
+
+impl SnapshotRecordV1 {
+    const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num 
rows*/;
+    pub const VERSION: u8 = 1;
+
+    pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> {
+        ensure!(
+            writer.len() >= SnapshotRecordV1::LENGTH,
+            "writer buf is too small for writing the record, length: {}",
+            writer.len()
+        );
+
+        writer
+            .write_u64::<LittleEndian>(self.id)
+            .context("write shall not fail.")?;
+        writer
+            .write_i64::<LittleEndian>(*self.time_range.start)
+            .context("write shall not fail.")?;
+        writer
+            .write_i64::<LittleEndian>(*self.time_range.end)
+            .context("write shall not fail.")?;
+        writer
+            .write_u32::<LittleEndian>(self.size)
+            .context("write shall not fail.")?;
+        writer
+            .write_u32::<LittleEndian>(self.num_rows)
+            .context("write shall not fail.")?;
+        Ok(())
+    }
+
+    pub fn id(&self) -> u64 {
+        self.id
+    }
+}
+
+impl From<SstFile> for SnapshotRecordV1 {
+    fn from(value: SstFile) -> Self {
+        SnapshotRecordV1 {
+            id: value.id(),
+            time_range: value.meta().time_range.clone(),
+            size: value.meta().size,
+            num_rows: value.meta().num_rows,
+        }
+    }
+}
+
+impl TryFrom<&[u8]> for SnapshotRecordV1 {
+    type Error = Error;
+
+    fn try_from(value: &[u8]) -> Result<Self> {
+        ensure!(
+            value.len() >= SnapshotRecordV1::LENGTH,
+            "invalid value len: {}",
+            value.len()
+        );
+
+        let mut cursor = Cursor::new(value);
+        let id = cursor.read_u64::<LittleEndian>().unwrap();
+        let start = cursor.read_i64::<LittleEndian>().unwrap();
+        let end = cursor.read_i64::<LittleEndian>().unwrap();
+        let size = cursor.read_u32::<LittleEndian>().unwrap();
+        let num_rows = cursor.read_u32::<LittleEndian>().unwrap();
+        Ok(SnapshotRecordV1 {
+            id,
+            time_range: (start..end).into(),
+            size,
+            num_rows,
+        })
+    }
+}
+
+impl From<SnapshotRecordV1> for SstFile {
+    fn from(record: SnapshotRecordV1) -> Self {
+        let file_meta = FileMeta {
+            max_sequence: record.id(),
+            num_rows: record.num_rows,
+            size: record.size,
+            time_range: record.time_range.clone(),
+        };
+        SstFile::new(record.id(), file_meta)
+    }
+}
+
+struct Snapshot {
+    header: SnapshotHeader,
+    inner: Bytes,
+}
+
+impl Default for Snapshot {
+    // create an empty Snapshot
+    fn default() -> Self {
+        let header = SnapshotHeader::new(0);
+        Self {
+            header,
+            inner: Bytes::new(),
+        }
+    }
+}
+
+impl TryFrom<Bytes> for Snapshot {
+    type Error = Error;
+
+    fn try_from(bytes: Bytes) -> Result<Self> {
+        if bytes.is_empty() {
+            return Ok(Snapshot::default());
+        }
+        let header = SnapshotHeader::try_from(bytes.as_bytes())?;
+        let header_length = header.length as usize;
+        ensure!(
+            header_length > 0
+                && header_length % SnapshotRecordV1::LENGTH == 0
+                && header_length + SnapshotHeader::LENGTH == bytes.len(),
+            "create snapshot from bytes failed, invalid bytes, header length = 
{}, total length: {}",
+                header_length,
+                bytes.len()
+        );
+
+        Ok(Self {
+            header,
+            inner: bytes,
+        })
+    }
+}
+
+impl Snapshot {
+    pub fn to_sstfiles(&self) -> Result<Vec<SstFile>> {
+        if self.header.length == 0 {
+            Ok(Vec::new())
+        } else {
+            let buf = self.inner.as_bytes();
+            let mut result: Vec<SstFile> =
+                Vec::with_capacity(self.header.length as usize / 
SnapshotRecordV1::LENGTH);
+            let mut index = SnapshotHeader::LENGTH;
+            while index < buf.len() {
+                let record =
+                    SnapshotRecordV1::try_from(&buf[index..index + 
SnapshotRecordV1::LENGTH])?;
+                index += SnapshotRecordV1::LENGTH;
+                result.push(record.into());
+            }
+
+            Ok(result)
+        }
+    }
+
+    pub fn dedup_sstfiles(&self, sstfiles: &mut Vec<SstFile>) -> Result<()> {
+        let buf = self.inner.as_bytes();
+        let mut ids = HashSet::new();
+        let mut index = SnapshotHeader::LENGTH;
+        while index < buf.len() {
+            let record = SnapshotRecordV1::try_from(&buf[index..index + 
SnapshotRecordV1::LENGTH])?;
+            index += SnapshotRecordV1::LENGTH;
+            ids.insert(record.id());
+        }
+        sstfiles.retain(|item| !ids.contains(&item.id()));
+
+        Ok(())
+    }
+
+    pub fn merge_sstfiles(&mut self, sstfiles: Vec<SstFile>) {
+        // update header
+        self.header.length += (sstfiles.len() * SnapshotRecordV1::LENGTH) as 
u64;
+        // final snapshot
+        let mut snapshot = vec![0u8; SnapshotHeader::LENGTH + 
self.header.length as usize];
+        let mut writer = snapshot.as_mut_slice();
+
+        // write new head
+        self.header.write_to(&mut writer).unwrap();
+        // write old records
+        if !self.inner.is_empty() {
+            writer
+                .write_all(&self.inner.as_bytes()[SnapshotHeader::LENGTH..])
+                .unwrap();
+        }
+        // write new records
+        for sst in sstfiles {
+            let record: SnapshotRecordV1 = sst.into();
+            record.write_to(&mut writer).unwrap();
+        }
+        self.inner = Bytes::from(snapshot);
+    }
+
+    pub fn into_bytes(self) -> Bytes {
+        self.inner
+    }
+}
+
 enum MergeType {
     Hard,
     Soft,
@@ -291,24 +593,13 @@ impl ManifestMerger {
             let sst_file = res.context("Failed to join read delta files 
task")??;
             delta_files.push(sst_file);
         }
-
-        let mut payload = read_snapshot(&self.store, 
&self.snapshot_path).await?;
-        payload.files.extend(delta_files);
-        payload.dedup_files();
-
-        let pb_manifest = pb_types::Manifest {
-            files: payload
-                .files
-                .into_iter()
-                .map(|f| f.into())
-                .collect::<Vec<_>>(),
-        };
-        let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
-        pb_manifest
-            .encode(&mut buf)
-            .context("failed to encode manifest")?;
-        let put_payload = PutPayload::from_bytes(Bytes::from(buf));
-
+        let snapshot_bytes = read_object(&self.store, 
&self.snapshot_path).await?;
+        let mut snapshot = Snapshot::try_from(snapshot_bytes)?;
+        // TODO: no need to dedup every time.
+        snapshot.dedup_sstfiles(&mut delta_files)?;
+        snapshot.merge_sstfiles(delta_files);
+        let snapshot_bytes = snapshot.into_bytes();
+        let put_payload = PutPayload::from_bytes(snapshot_bytes);
         // 1. Persist the snapshot
         self.store
             .put(&self.snapshot_path, put_payload)
@@ -341,6 +632,24 @@ impl ManifestMerger {
     }
 }
 
+async fn read_object(store: &ObjectStoreRef, path: &Path) -> Result<Bytes> {
+    match store.get(path).await {
+        Ok(v) => v
+            .bytes()
+            .await
+            .with_context(|| format!("Failed to read manifest snapshot, 
path:{path}"))
+            .map_err(|e| e.into()),
+        Err(err) => {
+            if err.to_string().contains("not found") {
+                Ok(Bytes::new())
+            } else {
+                let context = format!("Failed to read file, path:{path}");
+                Err(AnyhowError::new(err).context(context).into())
+            }
+        }
+    }
+}
+
 async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result<Payload> 
{
     match store.get(path).await {
         Ok(v) => {
@@ -405,9 +714,10 @@ async fn list_delta_paths(store: &ObjectStoreRef, 
delta_dir: &Path) -> Result<Ve
 
 #[cfg(test)]
 mod tests {
-    use std::{sync::Arc, thread::sleep};
+    use std::sync::Arc;
 
     use object_store::local::LocalFileSystem;
+    use tokio::time::sleep;
 
     use super::*;
 
@@ -500,10 +810,13 @@ mod tests {
         }
 
         // Wait for merge manifest to finish
-        sleep(Duration::from_secs(2));
+        sleep(Duration::from_secs(2)).await;
 
         let mut mem_ssts = manifest.payload.read().await.files.clone();
-        let mut ssts = read_snapshot(&store, 
&snapshot_path).await.unwrap().files;
+        let snapshot = read_object(&store, &snapshot_path).await.unwrap();
+        let snapshot_len = snapshot.len();
+        let payload: Payload = snapshot.try_into().unwrap();
+        let mut ssts = payload.files;
 
         mem_ssts.sort_by_key(|a| a.id());
         ssts.sort_by_key(|a| a.id());
@@ -511,5 +824,102 @@ mod tests {
 
         let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
         assert!(delta_paths.is_empty());
+
+        // Add manifest files again to verify dedup
+        for i in 0..20 {
+            let time_range = (i..i + 1).into();
+            let meta = FileMeta {
+                max_sequence: i as u64,
+                num_rows: i as u32,
+                size: i as u32,
+                time_range,
+            };
+            manifest.add_file(i as u64, meta).await.unwrap();
+        }
+
+        // Wait for merge manifest to finish
+        sleep(Duration::from_secs(2)).await;
+
+        let snapshot_again = read_object(&store, 
&snapshot_path).await.unwrap();
+        assert!(snapshot_len == snapshot_again.len()); // dedup took effect.
+        let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
+        assert!(delta_paths.is_empty());
+    }
+
+    #[test]
+    fn test_snapshot_header() {
+        let header = SnapshotHeader::new(257);
+        let mut vec = vec![0u8; SnapshotHeader::LENGTH];
+        let mut writer = vec.as_mut_slice();
+        header.write_to(&mut writer).unwrap();
+        assert!(writer.is_empty());
+        let mut cursor = Cursor::new(vec);
+
+        assert_eq!(
+            SnapshotHeader::MAGIC,
+            cursor.read_u32::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            1, // version
+            cursor.read_u8().unwrap()
+        );
+        assert_eq!(
+            0, // flag
+            cursor.read_u8().unwrap()
+        );
+        assert_eq!(
+            257, // length
+            cursor.read_u64::<LittleEndian>().unwrap()
+        );
+
+        let mut vec = [0u8; SnapshotHeader::LENGTH - 1];
+        let mut writer = vec.as_mut_slice();
+        let result = header.write_to(&mut writer);
+        assert!(result.is_err()); // buf not enough
+    }
+
+    #[test]
+    fn test_snapshot_record() {
+        let sstfile = SstFile::new(
+            99,
+            FileMeta {
+                max_sequence: 99,
+                num_rows: 100,
+                size: 938,
+                time_range: (100..200).into(),
+            },
+        );
+        let record: SnapshotRecordV1 = sstfile.into();
+        let mut vec: Vec<u8> = vec![0u8; SnapshotRecordV1::LENGTH];
+        let mut writer = vec.as_mut_slice();
+        record.write_to(&mut writer).unwrap();
+
+        assert!(writer.is_empty());
+        let mut cursor = Cursor::new(vec);
+
+        assert_eq!(
+            99, // id
+            cursor.read_u64::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            100, // start range
+            cursor.read_i64::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            200, // end range
+            cursor.read_i64::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            938, // size
+            cursor.read_u32::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            100, // num rows
+            cursor.read_u32::<LittleEndian>().unwrap()
+        );
+        let mut vec = vec![0u8; SnapshotRecordV1::LENGTH - 1];
+        let mut writer = vec.as_mut_slice();
+        let result = record.write_to(&mut writer);
+        assert!(result.is_err()); // buf not enough
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to