This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository

The following commit(s) were added to refs/heads/main by this push:
     new cc3352a7 refactor: move encoding struct into independent file (#1611)
cc3352a7 is described below

commit cc3352a7ae501847e3599d89788d4239d46a1475
Author: Jiacai Liu <>
AuthorDate: Wed Dec 18 11:21:30 2024 +0800

    refactor: move encoding struct into independent file (#1611)
    ## Rationale
    Followup #1610, to make manifest more modular.
    ## Detailed Changes
    ## Test Plan
 src/metric_engine/src/compaction/       |   8 +-
 src/metric_engine/src/manifest/         | 348 ++++++++++++++++++++-
 .../src/{ => manifest/}           | 333 +-------------------
 3 files changed, 354 insertions(+), 335 deletions(-)

diff --git a/src/metric_engine/src/compaction/ 
index 9f41ec82..c5cf83e2 100644
--- a/src/metric_engine/src/compaction/
+++ b/src/metric_engine/src/compaction/
@@ -99,7 +99,7 @@ impl Executor {
         let inused = self.inner.inused_memory.load(Ordering::Relaxed);
         let mem_limit = self.inner.mem_limit;
-            inused + task_size > mem_limit,
+            inused + task_size <= mem_limit,
             "Compaction memory usage too high, inused:{inused}, 
task_size:{task_size}, limit:{mem_limit}"
@@ -113,7 +113,7 @@ impl Executor {
         let task_size = task.input_size();
-            .fetch_add(task_size, Ordering::Relaxed);
+            .fetch_sub(task_size, Ordering::Relaxed);
     pub fn on_failure(&self, task: &Task) {
@@ -137,7 +137,7 @@ impl Executor {
             executor: self.clone(),
+        runnable.spawn()
     // TODO: Merge input sst files into one new sst file
@@ -257,7 +257,7 @@ pub struct Runnable {
 impl Runnable {
-    fn run(self) {
+    fn spawn(self) {
         let rt = self.executor.inner.runtime.clone();
         rt.spawn(async move {
             if let Err(e) = self.executor.do_compaction(&self.task).await {
diff --git a/src/metric_engine/src/manifest/ 
index 01deb7a5..72d43d6e 100644
--- a/src/metric_engine/src/manifest/
+++ b/src/metric_engine/src/manifest/
@@ -15,8 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
+use std::io::{Cursor, Write};
+use anyhow::Context;
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
+use bytes::Bytes;
+use parquet::data_type::AsBytes;
 use crate::{
-    sst::{FileId, SstFile},
+    ensure,
+    sst::{FileId, FileMeta, SstFile},
+    types::TimeRange,
     Error, Result,
@@ -66,3 +75,340 @@ impl From<ManifestUpdate> for pb_types::ManifestUpdate {
+/// The layout for the header.
+/// ```plaintext
+/// +-------------+--------------+------------+--------------+
+/// | magic(u32)  | version(u8)  | flag(u8)   | length(u32)  |
+/// +-------------+--------------+------------+--------------+
+/// ```
+/// - The Magic field (u32) is used to ensure the validity of the data source.
+/// - The Flags field (u8) is reserved for future extensibility, such as
+///   enabling compression or supporting additional features.
+/// - The length field (u64) represents the total length of the subsequent
+///   records and serves as a straightforward method for verifying their
+///   integrity. (length = record_length * record_count)
+pub struct SnapshotHeader {
+    pub magic: u32,
+    pub version: u8,
+    pub flag: u8,
+    pub length: u64,
+impl TryFrom<&[u8]> for SnapshotHeader {
+    type Error = Error;
+    fn try_from(bytes: &[u8]) -> Result<Self> {
+        ensure!(
+            bytes.len() >= Self::LENGTH,
+            "invalid bytes, length: {}",
+            bytes.len()
+        );
+        let mut cursor = Cursor::new(bytes);
+        let magic = cursor
+            .read_u32::<LittleEndian>()
+            .context("read snapshot header magic")?;
+        ensure!(
+            magic == SnapshotHeader::MAGIC,
+            "invalid bytes to convert to header."
+        );
+        let version = cursor.read_u8().context("read snapshot header 
+        let flag = cursor.read_u8().context("read snapshot header flag")?;
+        let length = cursor
+            .read_u64::<LittleEndian>()
+            .context("read snapshot header length")?;
+        Ok(Self {
+            magic,
+            version,
+            flag,
+            length,
+        })
+    }
+impl SnapshotHeader {
+    pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 
+    pub const MAGIC: u32 = 0xCAFE_1234;
+    pub fn new(length: u64) -> Self {
+        Self {
+            magic: SnapshotHeader::MAGIC,
+            version: SnapshotRecord::VERSION,
+            flag: 0,
+            length,
+        }
+    }
+    pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+    where
+        W: Write,
+    {
+        writer
+            .write_u32::<LittleEndian>(self.magic)
+            .context("write shall not fail.")?;
+        writer
+            .write_u8(self.version)
+            .context("write shall not fail.")?;
+        writer
+            .write_u8(self.flag)
+            .context("write shall not fail.")?;
+        writer
+            .write_u64::<LittleEndian>(self.length)
+            .context("write shall not fail.")?;
+        Ok(())
+    }
+/// The layout for manifest Record:
+/// ```plaintext
+/// +---------+-------------------+------------+-----------------+
+/// | id(u64) | time_range(i64*2) | size(u32)  |  num_rows(u32)  |
+/// +---------+-------------------+------------+-----------------+
+/// ```
+pub struct SnapshotRecord {
+    id: u64,
+    time_range: TimeRange,
+    size: u32,
+    num_rows: u32,
+impl SnapshotRecord {
+    const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num 
+    pub const VERSION: u8 = 1;
+    pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+    where
+        W: Write,
+    {
+        writer
+            .write_u64::<LittleEndian>(
+            .context("write shall not fail.")?;
+        writer
+            .write_i64::<LittleEndian>(*self.time_range.start)
+            .context("write shall not fail.")?;
+        writer
+            .write_i64::<LittleEndian>(*self.time_range.end)
+            .context("write shall not fail.")?;
+        writer
+            .write_u32::<LittleEndian>(self.size)
+            .context("write shall not fail.")?;
+        writer
+            .write_u32::<LittleEndian>(self.num_rows)
+            .context("write shall not fail.")?;
+        Ok(())
+    }
+    pub fn id(&self) -> u64 {
+    }
+impl From<SstFile> for SnapshotRecord {
+    fn from(value: SstFile) -> Self {
+        SnapshotRecord {
+            id:,
+            time_range: value.meta().time_range.clone(),
+            size: value.meta().size,
+            num_rows: value.meta().num_rows,
+        }
+    }
+impl TryFrom<&[u8]> for SnapshotRecord {
+    type Error = Error;
+    fn try_from(value: &[u8]) -> Result<Self> {
+        ensure!(
+            value.len() >= SnapshotRecord::LENGTH,
+            "invalid value len: {}",
+            value.len()
+        );
+        let mut cursor = Cursor::new(value);
+        let id = cursor
+            .read_u64::<LittleEndian>()
+            .context("read record id")?;
+        let start = cursor
+            .read_i64::<LittleEndian>()
+            .context("read record start")?;
+        let end = cursor
+            .read_i64::<LittleEndian>()
+            .context("read record end")?;
+        let size = cursor
+            .read_u32::<LittleEndian>()
+            .context("read record size")?;
+        let num_rows = cursor
+            .read_u32::<LittleEndian>()
+            .context("read record num_rows")?;
+        Ok(SnapshotRecord {
+            id,
+            time_range: (start..end).into(),
+            size,
+            num_rows,
+        })
+    }
+impl From<SnapshotRecord> for SstFile {
+    fn from(record: SnapshotRecord) -> Self {
+        let file_meta = FileMeta {
+            max_sequence:,
+            num_rows: record.num_rows,
+            size: record.size,
+            time_range: record.time_range.clone(),
+        };
+        SstFile::new(, file_meta)
+    }
+pub struct Snapshot {
+    header: SnapshotHeader,
+    records: Vec<SnapshotRecord>,
+impl Default for Snapshot {
+    // create an empty Snapshot
+    fn default() -> Self {
+        let header = SnapshotHeader::new(0);
+        Self {
+            header,
+            records: Vec::new(),
+        }
+    }
+impl TryFrom<Bytes> for Snapshot {
+    type Error = Error;
+    fn try_from(bytes: Bytes) -> Result<Self> {
+        if bytes.is_empty() {
+            return Ok(Snapshot::default());
+        }
+        let header = SnapshotHeader::try_from(bytes.as_bytes())?;
+        let record_total_length = header.length as usize;
+        ensure!(
+            record_total_length > 0
+                && record_total_length % SnapshotRecord::LENGTH == 0
+                && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
+            "create snapshot from bytes failed, header:{header:?}, 
bytes_length: {}",
+            bytes.len()
+        );
+        let mut index = SnapshotHeader::LENGTH;
+        let mut records = Vec::with_capacity(record_total_length / 
+        while index < bytes.len() {
+            let record = SnapshotRecord::try_from(&bytes[index..index + 
+            records.push(record);
+            index += SnapshotRecord::LENGTH;
+        }
+        Ok(Self { header, records })
+    }
+impl Snapshot {
+    pub fn into_ssts(self) -> Vec<SstFile> {
+        if self.header.length == 0 {
+            Vec::new()
+        } else {
+            self.records.into_iter().map(|r| r.into()).collect()
+        }
+    }
+    // TODO: Ensure no files duplicated
+    //
+    pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> {
+        self.records
+            .extend(update.to_adds.into_iter().map(SnapshotRecord::from));
+        self.records
+            .retain(|record| !update.to_deletes.contains(&;
+        self.header.length = (self.records.len() * SnapshotRecord::LENGTH) as 
+        Ok(())
+    }
+    pub fn into_bytes(self) -> Result<Bytes> {
+        let buf = Vec::with_capacity(self.header.length as usize + 
+        let mut cursor = Cursor::new(buf);
+        self.header.write_to(&mut cursor)?;
+        for record in self.records {
+            record.write_to(&mut cursor)?;
+        }
+        Ok(Bytes::from(cursor.into_inner()))
+    }
+mod tests {
+    use super::*;
+    #[test]
+    fn test_snapshot_header() {
+        let header = SnapshotHeader::new(257);
+        let mut vec = vec![0u8; SnapshotHeader::LENGTH];
+        let mut writer = vec.as_mut_slice();
+        header.write_to(&mut writer).unwrap();
+        assert!(writer.is_empty());
+        let mut cursor = Cursor::new(vec);
+        assert_eq!(
+            SnapshotHeader::MAGIC,
+            cursor.read_u32::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            1, // version
+            cursor.read_u8().unwrap()
+        );
+        assert_eq!(
+            0, // flag
+            cursor.read_u8().unwrap()
+        );
+        assert_eq!(
+            257, // length
+            cursor.read_u64::<LittleEndian>().unwrap()
+        );
+    }
+    #[test]
+    fn test_snapshot_record() {
+        let sstfile = SstFile::new(
+            99,
+            FileMeta {
+                max_sequence: 99,
+                num_rows: 100,
+                size: 938,
+                time_range: (100..200).into(),
+            },
+        );
+        let record: SnapshotRecord = sstfile.into();
+        let mut vec: Vec<u8> = vec![0u8; SnapshotRecord::LENGTH];
+        let mut writer = vec.as_mut_slice();
+        record.write_to(&mut writer).unwrap();
+        assert!(writer.is_empty());
+        let mut cursor = Cursor::new(vec);
+        assert_eq!(
+            99, // id
+            cursor.read_u64::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            100, // start range
+            cursor.read_i64::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            200, // end range
+            cursor.read_i64::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            938, // size
+            cursor.read_u32::<LittleEndian>().unwrap()
+        );
+        assert_eq!(
+            100, // num rows
+            cursor.read_u32::<LittleEndian>().unwrap()
+        );
+    }
diff --git a/src/metric_engine/src/ 
similarity index 59%
rename from src/metric_engine/src/
rename to src/metric_engine/src/manifest/
index 17b188d6..ffc45567 100644
--- a/src/metric_engine/src/
+++ b/src/metric_engine/src/manifest/
@@ -17,7 +17,6 @@
 mod encoding;
 use std::{
-    io::{Cursor, Write},
         atomic::{AtomicUsize, Ordering},
@@ -27,12 +26,11 @@ use std::{
 use anyhow::Context;
 use async_scoped::TokioScope;
-use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
 use bytes::Bytes;
 pub use encoding::ManifestUpdate;
+use encoding::Snapshot;
 use futures::{StreamExt, TryStreamExt};
 use object_store::{path::Path, PutPayload};
-use parquet::data_type::AsBytes;
 use prost::Message;
 use tokio::sync::{
     mpsc::{self, Receiver, Sender},
@@ -42,10 +40,9 @@ use tracing::error;
 use uuid::Uuid;
 use crate::{
-    ensure,
     sst::{FileId, FileMeta, SstFile},
     types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange},
-    AnyhowError, Error, Result,
+    AnyhowError, Result,
 pub const PREFIX_PATH: &str = "manifest";
@@ -152,257 +149,6 @@ impl Manifest {
-/// The layout for the header.
-/// ```plaintext
-/// +-------------+--------------+------------+--------------+
-/// | magic(u32)  | version(u8)  | flag(u8)   | length(u32)  |
-/// +-------------+--------------+------------+--------------+
-/// ```
-/// - The Magic field (u32) is used to ensure the validity of the data source.
-/// - The Flags field (u8) is reserved for future extensibility, such as
-///   enabling compression or supporting additional features.
-/// - The length field (u64) represents the total length of the subsequent
-///   records and serves as a straightforward method for verifying their
-///   integrity. (length = record_length * record_count)
-struct SnapshotHeader {
-    pub magic: u32,
-    pub version: u8,
-    pub flag: u8,
-    pub length: u64,
-impl TryFrom<&[u8]> for SnapshotHeader {
-    type Error = Error;
-    fn try_from(bytes: &[u8]) -> Result<Self> {
-        ensure!(
-            bytes.len() >= Self::LENGTH,
-            "invalid bytes, length: {}",
-            bytes.len()
-        );
-        let mut cursor = Cursor::new(bytes);
-        let magic = cursor.read_u32::<LittleEndian>().unwrap();
-        ensure!(
-            magic == SnapshotHeader::MAGIC,
-            "invalid bytes to convert to header."
-        );
-        let version = cursor.read_u8().unwrap();
-        let flag = cursor.read_u8().unwrap();
-        let length = cursor.read_u64::<LittleEndian>().unwrap();
-        Ok(Self {
-            magic,
-            version,
-            flag,
-            length,
-        })
-    }
-impl SnapshotHeader {
-    pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8 
-    pub const MAGIC: u32 = 0xCAFE_6666;
-    pub fn new(length: u64) -> Self {
-        Self {
-            magic: SnapshotHeader::MAGIC,
-            version: SnapshotRecordV1::VERSION,
-            flag: 0,
-            length,
-        }
-    }
-    pub fn write_to<W>(&self, mut writer: W) -> Result<()>
-    where
-        W: Write,
-    {
-        writer
-            .write_u32::<LittleEndian>(self.magic)
-            .context("write shall not fail.")?;
-        writer
-            .write_u8(self.version)
-            .context("write shall not fail.")?;
-        writer
-            .write_u8(self.flag)
-            .context("write shall not fail.")?;
-        writer
-            .write_u64::<LittleEndian>(self.length)
-            .context("write shall not fail.")?;
-        Ok(())
-    }
-/// The layout for manifest Record:
-/// ```plaintext
-/// +---------+-------------------+------------+-----------------+
-/// | id(u64) | time_range(i64*2) | size(u32)  |  num_rows(u32)  |
-/// +---------+-------------------+------------+-----------------+
-/// ```
-struct SnapshotRecordV1 {
-    id: u64,
-    time_range: TimeRange,
-    size: u32,
-    num_rows: u32,
-impl SnapshotRecordV1 {
-    const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num 
-    pub const VERSION: u8 = 1;
-    pub fn write_to<W>(&self, mut writer: W) -> Result<()>
-    where
-        W: Write,
-    {
-        writer
-            .write_u64::<LittleEndian>(
-            .context("write shall not fail.")?;
-        writer
-            .write_i64::<LittleEndian>(*self.time_range.start)
-            .context("write shall not fail.")?;
-        writer
-            .write_i64::<LittleEndian>(*self.time_range.end)
-            .context("write shall not fail.")?;
-        writer
-            .write_u32::<LittleEndian>(self.size)
-            .context("write shall not fail.")?;
-        writer
-            .write_u32::<LittleEndian>(self.num_rows)
-            .context("write shall not fail.")?;
-        Ok(())
-    }
-    pub fn id(&self) -> u64 {
-    }
-impl From<SstFile> for SnapshotRecordV1 {
-    fn from(value: SstFile) -> Self {
-        SnapshotRecordV1 {
-            id:,
-            time_range: value.meta().time_range.clone(),
-            size: value.meta().size,
-            num_rows: value.meta().num_rows,
-        }
-    }
-impl TryFrom<&[u8]> for SnapshotRecordV1 {
-    type Error = Error;
-    fn try_from(value: &[u8]) -> Result<Self> {
-        ensure!(
-            value.len() >= SnapshotRecordV1::LENGTH,
-            "invalid value len: {}",
-            value.len()
-        );
-        let mut cursor = Cursor::new(value);
-        let id = cursor.read_u64::<LittleEndian>().unwrap();
-        let start = cursor.read_i64::<LittleEndian>().unwrap();
-        let end = cursor.read_i64::<LittleEndian>().unwrap();
-        let size = cursor.read_u32::<LittleEndian>().unwrap();
-        let num_rows = cursor.read_u32::<LittleEndian>().unwrap();
-        Ok(SnapshotRecordV1 {
-            id,
-            time_range: (start..end).into(),
-            size,
-            num_rows,
-        })
-    }
-impl From<SnapshotRecordV1> for SstFile {
-    fn from(record: SnapshotRecordV1) -> Self {
-        let file_meta = FileMeta {
-            max_sequence:,
-            num_rows: record.num_rows,
-            size: record.size,
-            time_range: record.time_range.clone(),
-        };
-        SstFile::new(, file_meta)
-    }
-struct Snapshot {
-    header: SnapshotHeader,
-    records: Vec<SnapshotRecordV1>,
-impl Default for Snapshot {
-    // create an empty Snapshot
-    fn default() -> Self {
-        let header = SnapshotHeader::new(0);
-        Self {
-            header,
-            records: Vec::new(),
-        }
-    }
-impl TryFrom<Bytes> for Snapshot {
-    type Error = Error;
-    fn try_from(bytes: Bytes) -> Result<Self> {
-        if bytes.is_empty() {
-            return Ok(Snapshot::default());
-        }
-        let header = SnapshotHeader::try_from(bytes.as_bytes())?;
-        let record_total_length = header.length as usize;
-        ensure!(
-            record_total_length > 0
-                && record_total_length % SnapshotRecordV1::LENGTH == 0
-                && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
-            "create snapshot from bytes failed, header:{header:?}, 
bytes_length: {}",
-            bytes.len()
-        );
-        let mut index = SnapshotHeader::LENGTH;
-        let mut records = Vec::with_capacity(record_total_length / 
-        while index < bytes.len() {
-            let record =
-                SnapshotRecordV1::try_from(&bytes[index..index + 
-            records.push(record);
-            index += SnapshotRecordV1::LENGTH;
-        }
-        Ok(Self { header, records })
-    }
-impl Snapshot {
-    pub fn into_ssts(self) -> Vec<SstFile> {
-        if self.header.length == 0 {
-            Vec::new()
-        } else {
-            self.records.into_iter().map(|r| r.into()).collect()
-        }
-    }
-    // TODO: Ensure no files duplicated
-    //
-    pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> {
-        self.records
-            .extend(update.to_adds.into_iter().map(SnapshotRecordV1::from));
-        self.records
-            .retain(|record| !update.to_deletes.contains(&;
-        self.header.length = (self.records.len() * SnapshotRecordV1::LENGTH) 
as u64;
-        Ok(())
-    }
-    pub fn into_bytes(self) -> Result<Bytes> {
-        let buf = Vec::with_capacity(self.header.length as usize + 
-        let mut cursor = Cursor::new(buf);
-        self.header.write_to(&mut cursor)?;
-        for record in self.records {
-            record.write_to(&mut cursor).unwrap();
-        }
-        Ok(Bytes::from(cursor.into_inner()))
-    }
 enum MergeType {
@@ -605,7 +351,6 @@ async fn list_delta_paths(store: &ObjectStoreRef, 
delta_dir: &Path) -> Result<Ve
 mod tests {
     use std::sync::Arc;
-    use itertools::Itertools;
     use object_store::local::LocalFileSystem;
     use tokio::time::sleep;
@@ -705,11 +450,7 @@ mod tests {
             let mut mem_ssts =;
             let snapshot = read_snapshot(&store, 
-            let mut ssts = snapshot
-                .records
-                .into_iter()
-                .map(SstFile::from)
-                .collect_vec();
+            let mut ssts = snapshot.into_ssts();
@@ -719,72 +460,4 @@ mod tests {
-    #[test]
-    fn test_snapshot_header() {
-        let header = SnapshotHeader::new(257);
-        let mut vec = vec![0u8; SnapshotHeader::LENGTH];
-        let mut writer = vec.as_mut_slice();
-        header.write_to(&mut writer).unwrap();
-        assert!(writer.is_empty());
-        let mut cursor = Cursor::new(vec);
-        assert_eq!(
-            SnapshotHeader::MAGIC,
-            cursor.read_u32::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            1, // version
-            cursor.read_u8().unwrap()
-        );
-        assert_eq!(
-            0, // flag
-            cursor.read_u8().unwrap()
-        );
-        assert_eq!(
-            257, // length
-            cursor.read_u64::<LittleEndian>().unwrap()
-        );
-    }
-    #[test]
-    fn test_snapshot_record() {
-        let sstfile = SstFile::new(
-            99,
-            FileMeta {
-                max_sequence: 99,
-                num_rows: 100,
-                size: 938,
-                time_range: (100..200).into(),
-            },
-        );
-        let record: SnapshotRecordV1 = sstfile.into();
-        let mut vec: Vec<u8> = vec![0u8; SnapshotRecordV1::LENGTH];
-        let mut writer = vec.as_mut_slice();
-        record.write_to(&mut writer).unwrap();
-        assert!(writer.is_empty());
-        let mut cursor = Cursor::new(vec);
-        assert_eq!(
-            99, // id
-            cursor.read_u64::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            100, // start range
-            cursor.read_i64::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            200, // end range
-            cursor.read_i64::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            938, // size
-            cursor.read_u32::<LittleEndian>().unwrap()
-        );
-        assert_eq!(
-            100, // num rows
-            cursor.read_u32::<LittleEndian>().unwrap()
-        );
-    }

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to