This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 03b1df93 feat: manifest support delete (#1610)
03b1df93 is described below

commit 03b1df935b1df0aafbc40c1909805d8ca64e4063
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Dec 18 10:43:20 2024 +0800

    feat: manifest support delete (#1610)
    
    ## Rationale
    When compact finished, we need to delete the old input sst and expired
    sst.
    
    ## Detailed Changes
    - The delta file use `ManifestUpdate` struct.
    - Refactor compact scheduler, to make it more modular.
    ## Test Plan
    CI
---
 .github/workflows/ci.yml                           |   4 +-
 Makefile                                           |   3 +-
 .../src/compaction/{runner.rs => executor.rs}      | 159 +++++--
 src/metric_engine/src/compaction/mod.rs            |  13 +-
 src/metric_engine/src/compaction/picker.rs         |  56 ++-
 src/metric_engine/src/compaction/scheduler.rs      | 109 ++---
 src/metric_engine/src/lib.rs                       |   1 +
 src/metric_engine/src/manifest.rs                  | 491 +++++++--------------
 src/metric_engine/src/manifest/encoding.rs         |  68 +++
 src/metric_engine/src/sst.rs                       |   4 +
 src/metric_engine/src/storage.rs                   | 171 +++----
 src/metric_engine/src/{lib.rs => util.rs}          |  22 +-
 src/pb_types/protos/sst.proto                      |   8 +-
 13 files changed, 564 insertions(+), 545 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c88221af..73775bc6 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -67,6 +67,7 @@ jobs:
       - name: Install check binaries
         run: |
           cargo install --git https://github.com/DevinR528/cargo-sort --rev 
55ec890 --locked
+      - uses: Swatinem/rust-cache@v2
       - name: Run Style Check
         run: |
           make fmt sort clippy
@@ -80,8 +81,6 @@ jobs:
     timeout-minutes: 60
     steps:
       - uses: actions/checkout@v4
-        with:
-          submodules: true
       - name: Release Disk Quota
         run: |
           sudo make ensure-disk-quota
@@ -89,6 +88,7 @@ jobs:
         run: |
           sudo apt update
           sudo apt install --yes protobuf-compiler
+      - uses: Swatinem/rust-cache@v2
       - name: Run Unit Tests
         run: |
           make test
diff --git a/Makefile b/Makefile
index b39aeb18..6b4f287f 100644
--- a/Makefile
+++ b/Makefile
@@ -47,8 +47,7 @@ udeps:
        cd $(DIR); cargo udeps --all-targets --all-features --workspace
 
 clippy:
-       cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D 
warnings -D clippy::dbg-macro \
-       -A dead_code -A unused_variables -A clippy::unreachable -A 
clippy::too_many_arguments # Remove these once we have a clean build
+       cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D 
warnings -D clippy::dbg-macro -A clippy::too-many-arguments
 
 ensure-disk-quota:
        bash ./scripts/free-disk-space.sh
diff --git a/src/metric_engine/src/compaction/runner.rs 
b/src/metric_engine/src/compaction/executor.rs
similarity index 55%
rename from src/metric_engine/src/compaction/runner.rs
rename to src/metric_engine/src/compaction/executor.rs
index ce6f170e..9f41ec82 100644
--- a/src/metric_engine/src/compaction/runner.rs
+++ b/src/metric_engine/src/compaction/executor.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
+use std::sync::{
+    atomic::{AtomicU64, Ordering},
+    Arc,
+};
 
 use anyhow::Context;
 use arrow::array::{RecordBatch, UInt64Array};
@@ -31,45 +34,59 @@ use tracing::error;
 
 use crate::{
     compaction::Task,
-    manifest::ManifestRef,
+    ensure,
+    manifest::{ManifestRef, ManifestUpdate},
     read::ParquetReader,
-    sst::{allocate_id, FileMeta, SstPathGenerator},
-    types::{ObjectStoreRef, StorageSchema},
+    sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
+    types::{ObjectStoreRef, RuntimeRef, StorageSchema},
     Result,
 };
 
 #[derive(Clone)]
-pub struct Runner {
+pub struct Executor {
+    inner: Arc<Inner>,
+}
+
+struct Inner {
+    runtime: RuntimeRef,
     store: ObjectStoreRef,
     schema: StorageSchema,
     manifest: ManifestRef,
     sst_path_gen: Arc<SstPathGenerator>,
     parquet_reader: Arc<ParquetReader>,
     write_props: WriterProperties,
+    inused_memory: AtomicU64,
+    mem_limit: u64,
 }
 
-impl Runner {
+impl Executor {
     pub fn new(
+        runtime: RuntimeRef,
         store: ObjectStoreRef,
         schema: StorageSchema,
         manifest: ManifestRef,
         sst_path_gen: Arc<SstPathGenerator>,
         parquet_reader: Arc<ParquetReader>,
         write_props: WriterProperties,
+        mem_limit: u64,
     ) -> Self {
-        Self {
+        let inner = Inner {
+            runtime,
             store,
             schema,
             manifest,
             sst_path_gen,
             parquet_reader,
             write_props,
+            mem_limit,
+            inused_memory: AtomicU64::new(0),
+        };
+        Self {
+            inner: Arc::new(inner),
         }
     }
 
-    // TODO: Merge input sst files into one new sst file
-    // and delete the expired sst files
-    pub async fn do_compaction(&self, task: Task) -> Result<()> {
+    fn pre_check(&self, task: &Task) -> Result<()> {
         assert!(!task.inputs.is_empty());
         for f in &task.inputs {
             assert!(f.is_compaction());
@@ -77,24 +94,77 @@ impl Runner {
         for f in &task.expireds {
             assert!(f.is_compaction());
         }
+
+        let task_size = task.input_size();
+        let inused = self.inner.inused_memory.load(Ordering::Relaxed);
+        let mem_limit = self.inner.mem_limit;
+        ensure!(
+            inused + task_size > mem_limit,
+            "Compaction memory usage too high, inused:{inused}, 
task_size:{task_size}, limit:{mem_limit}"
+        );
+
+        self.inner
+            .inused_memory
+            .fetch_add(task.input_size(), Ordering::Relaxed);
+        Ok(())
+    }
+
+    pub fn on_success(&self, task: &Task) {
+        let task_size = task.input_size();
+        self.inner
+            .inused_memory
+            .fetch_add(task_size, Ordering::Relaxed);
+    }
+
+    pub fn on_failure(&self, task: &Task) {
+        let task_size = task.input_size();
+        self.inner
+            .inused_memory
+            .fetch_sub(task_size, Ordering::Relaxed);
+
+        // When task execution fails, unmark sst so they can be
+        // reschduled.
+        for sst in &task.inputs {
+            sst.unmark_compaction();
+        }
+        for sst in &task.expireds {
+            sst.unmark_compaction();
+        }
+    }
+
+    pub fn submit(&self, task: Task) {
+        let runnable = Runnable {
+            executor: self.clone(),
+            task,
+        };
+        runnable.run()
+    }
+
+    // TODO: Merge input sst files into one new sst file
+    // and delete the expired sst files
+    pub async fn do_compaction(&self, task: &Task) -> Result<()> {
+        self.pre_check(task)?;
+
         let mut time_range = task.inputs[0].meta().time_range.clone();
         for f in &task.inputs[1..] {
             time_range.merge(&f.meta().time_range);
         }
-        let plan = self
-            .parquet_reader
-            .build_df_plan(task.inputs.clone(), None, Vec::new())?;
+        let plan =
+            self.inner
+                .parquet_reader
+                .build_df_plan(task.inputs.clone(), None, Vec::new())?;
         let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
             .context("execute datafusion plan")?;
 
         let file_id = allocate_id();
-        let file_path = self.sst_path_gen.generate(file_id);
+        let file_path = self.inner.sst_path_gen.generate(file_id);
         let file_path = Path::from(file_path);
-        let object_store_writer = ParquetObjectWriter::new(self.store.clone(), 
file_path.clone());
+        let object_store_writer =
+            ParquetObjectWriter::new(self.inner.store.clone(), 
file_path.clone());
         let mut writer = AsyncArrowWriter::try_new(
             object_store_writer,
-            self.schema.arrow_schema.clone(),
-            Some(self.write_props.clone()),
+            self.inner.schema.arrow_schema.clone(),
+            Some(self.inner.write_props.clone()),
         )
         .context("create arrow writer")?;
         let mut num_rows = 0;
@@ -107,7 +177,7 @@ impl Runner {
                 // Since file_id in increasing order, we can use it as 
sequence.
                 let seq_column = Arc::new(UInt64Array::from(vec![file_id; 
batch.num_rows()]));
                 new_cols.push(seq_column);
-                RecordBatch::try_new(self.schema.arrow_schema.clone(), 
new_cols)
+                RecordBatch::try_new(self.inner.schema.arrow_schema.clone(), 
new_cols)
                     .context("construct record batch with seq column")?
             };
 
@@ -115,6 +185,7 @@ impl Runner {
         }
         writer.close().await.context("close writer")?;
         let object_meta = self
+            .inner
             .store
             .head(&file_path)
             .await
@@ -126,28 +197,37 @@ impl Runner {
             time_range: time_range.clone(),
         };
         // First add new sst to manifest, then delete expired/old sst
-        self.manifest.add_file(file_id, file_meta).await?;
-        self.manifest
-            .add_tombstone_files(task.expireds.clone())
-            .await?;
-        self.manifest
-            .add_tombstone_files(task.inputs.clone())
+        let to_adds = vec![SstFile::new(file_id, file_meta)];
+        let to_deletes = task
+            .expireds
+            .iter()
+            .map(|f| f.id())
+            .chain(task.inputs.iter().map(|f| f.id()))
+            .collect();
+        self.inner
+            .manifest
+            .update(ManifestUpdate::new(to_adds, to_deletes))
             .await?;
 
+        // From now on, no error should be returned!
+        // Because we have already updated manifest.
+
         let (_, results) = TokioScope::scope_and_block(|scope| {
-            for file in task.expireds {
-                let path = Path::from(self.sst_path_gen.generate(file.id()));
+            for file in &task.expireds {
+                let path = 
Path::from(self.inner.sst_path_gen.generate(file.id()));
                 scope.spawn(async move {
-                    self.store
+                    self.inner
+                        .store
                         .delete(&path)
                         .await
                         .with_context(|| format!("failed to delete file, 
path:{path}"))
                 });
             }
-            for file in task.inputs {
-                let path = Path::from(self.sst_path_gen.generate(file.id()));
+            for file in &task.inputs {
+                let path = 
Path::from(self.inner.sst_path_gen.generate(file.id()));
                 scope.spawn(async move {
-                    self.store
+                    self.inner
+                        .store
                         .delete(&path)
                         .await
                         .with_context(|| format!("failed to delete file, 
path:{path}"))
@@ -170,3 +250,22 @@ impl Runner {
         Ok(())
     }
 }
+
+pub struct Runnable {
+    executor: Executor,
+    task: Task,
+}
+
+impl Runnable {
+    fn run(self) {
+        let rt = self.executor.inner.runtime.clone();
+        rt.spawn(async move {
+            if let Err(e) = self.executor.do_compaction(&self.task).await {
+                error!("Do compaction failed, err:{e}");
+                self.executor.on_failure(&self.task);
+            } else {
+                self.executor.on_success(&self.task);
+            }
+        });
+    }
+}
diff --git a/src/metric_engine/src/compaction/mod.rs 
b/src/metric_engine/src/compaction/mod.rs
index 62beda0a..38a37ebe 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/compaction/mod.rs
@@ -15,21 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod executor;
 mod picker;
-mod runner;
 mod scheduler;
 
 pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
 
 use crate::sst::SstFile;
 
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Input {
-    files: Vec<SstFile>,
-}
-
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Task {
     pub inputs: Vec<SstFile>,
     pub expireds: Vec<SstFile>,
 }
+
+impl Task {
+    pub fn input_size(&self) -> u64 {
+        self.inputs.iter().map(|f| f.size() as u64).sum()
+    }
+}
diff --git a/src/metric_engine/src/compaction/picker.rs 
b/src/metric_engine/src/compaction/picker.rs
index 913ad317..9107e3ba 100644
--- a/src/metric_engine/src/compaction/picker.rs
+++ b/src/metric_engine/src/compaction/picker.rs
@@ -19,19 +19,56 @@ use std::{collections::BTreeMap, time::Duration};
 
 use tracing::debug;
 
-use super::SchedulerConfig;
-use crate::{compaction::Task, sst::SstFile, types::Timestamp};
+use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile, 
types::Timestamp, util::now};
+
+pub struct Picker {
+    manifest: ManifestRef,
+    ttl: Option<Duration>,
+    strategy: TimeWindowCompactionStrategy,
+}
+
+impl Picker {
+    pub fn new(
+        manifest: ManifestRef,
+        ttl: Option<Duration>,
+        segment_duration: Duration,
+        new_sst_max_size: u64,
+        input_sst_max_num: usize,
+    ) -> Self {
+        Self {
+            manifest,
+            ttl,
+            strategy: TimeWindowCompactionStrategy::new(
+                segment_duration,
+                new_sst_max_size,
+                input_sst_max_num,
+            ),
+        }
+    }
+
+    pub async fn pick_candidate(&self) -> Option<Task> {
+        let ssts = self.manifest.all_ssts().await;
+        let expire_time = self.ttl.map(|ttl| (now() - ttl.as_micros() as 
i64).into());
+        self.strategy.pick_candidate(ssts, expire_time)
+    }
+}
 
 pub struct TimeWindowCompactionStrategy {
     segment_duration: Duration,
-    config: SchedulerConfig,
+    new_sst_max_size: u64,
+    input_sst_max_num: usize,
 }
 
 impl TimeWindowCompactionStrategy {
-    pub fn new(segment_duration: Duration, config: SchedulerConfig) -> Self {
+    pub fn new(
+        segment_duration: Duration,
+        new_sst_max_size: u64,
+        input_sst_max_num: usize,
+    ) -> Self {
         Self {
             segment_duration,
-            config,
+            new_sst_max_size,
+            input_sst_max_num,
         }
     }
 
@@ -120,12 +157,12 @@ impl TimeWindowCompactionStrategy {
             files.sort_unstable_by_key(SstFile::size);
 
             let mut input_size = 0;
-            let memory_limit = self.config.memory_limit;
-            let compaction_files_limit = self.config.compaction_files_limit;
+            // Suppose the comaction will reduce the size of files by 10%.
+            let memory_limit = (self.new_sst_max_size as f64 * 1.1) as u64;
 
             let compaction_files = files
                 .into_iter()
-                .take(compaction_files_limit)
+                .take(self.input_sst_max_num)
                 .take_while(|f| {
                     input_size += f.size() as u64;
                     input_size <= memory_limit
@@ -154,8 +191,7 @@ mod tests {
     #[test]
     fn test_pick_candidate() {
         let segment_duration = Duration::from_millis(20);
-        let config = SchedulerConfig::default();
-        let strategy = TimeWindowCompactionStrategy::new(segment_duration, 
config);
+        let strategy = TimeWindowCompactionStrategy::new(segment_duration, 
9999, 10);
 
         let ssts = (0_i64..5_i64)
             .map(|i| {
diff --git a/src/metric_engine/src/compaction/scheduler.rs 
b/src/metric_engine/src/compaction/scheduler.rs
index 5e2c407a..4832bf96 100644
--- a/src/metric_engine/src/compaction/scheduler.rs
+++ b/src/metric_engine/src/compaction/scheduler.rs
@@ -15,12 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    sync::{atomic::AtomicU64, Arc},
-    time::Duration,
-};
+use std::{sync::Arc, time::Duration};
 
-use anyhow::Context;
 use parquet::file::properties::WriterProperties;
 use tokio::{
     sync::mpsc::{self, Receiver, Sender},
@@ -29,21 +25,20 @@ use tokio::{
 };
 use tracing::warn;
 
-use super::runner::Runner;
+use super::{executor::Executor, picker::Picker};
 use crate::{
-    compaction::{picker::TimeWindowCompactionStrategy, Task},
+    compaction::Task,
     manifest::ManifestRef,
     read::ParquetReader,
     sst::SstPathGenerator,
     types::{ObjectStoreRef, RuntimeRef, StorageSchema},
-    Result,
 };
 
+#[allow(dead_code)]
 pub struct Scheduler {
     runtime: RuntimeRef,
 
     task_tx: Sender<Task>,
-    inused_memory: AtomicU64,
     task_handle: JoinHandle<()>,
     picker_handle: JoinHandle<()>,
 }
@@ -61,29 +56,35 @@ impl Scheduler {
     ) -> Self {
         let (task_tx, task_rx) = 
mpsc::channel(config.max_pending_compaction_tasks);
         let task_handle = {
-            let rt = runtime.clone();
             let store = store.clone();
             let manifest = manifest.clone();
             let write_props = config.write_props.clone();
+            let executor = Executor::new(
+                runtime.clone(),
+                store,
+                schema,
+                manifest,
+                sst_path_gen,
+                parquet_reader,
+                write_props,
+                config.memory_limit,
+            );
+
             runtime.spawn(async move {
-                Self::recv_task_loop(
-                    rt,
-                    task_rx,
-                    store,
-                    schema,
-                    manifest,
-                    sst_path_gen,
-                    parquet_reader,
-                    config.memory_limit,
-                    write_props,
-                )
-                .await;
+                Self::recv_task_loop(task_rx, executor).await;
             })
         };
         let picker_handle = {
             let task_tx = task_tx.clone();
             runtime.spawn(async move {
-                Self::generate_task_loop(manifest, task_tx, segment_duration, 
config).await;
+                let picker = Picker::new(
+                    manifest,
+                    config.ttl,
+                    segment_duration,
+                    config.new_sst_max_size,
+                    config.input_sst_max_num,
+                );
+                Self::generate_task_loop(task_tx, picker, 
config.schedule_interval).await;
             })
         };
 
@@ -92,60 +93,22 @@ impl Scheduler {
             task_tx,
             task_handle,
             picker_handle,
-            inused_memory: AtomicU64::new(0),
         }
     }
 
-    pub fn try_send(&self, task: Task) -> Result<()> {
-        self.task_tx
-            .try_send(task)
-            .context("failed to send task to scheduler")?;
-
-        Ok(())
-    }
-
-    async fn recv_task_loop(
-        rt: RuntimeRef,
-        mut task_rx: Receiver<Task>,
-        store: ObjectStoreRef,
-        schema: StorageSchema,
-        manifest: ManifestRef,
-        sst_path_gen: Arc<SstPathGenerator>,
-        parquet_reader: Arc<ParquetReader>,
-        _mem_limit: u64,
-        write_props: WriterProperties,
-    ) {
-        let runner = Runner::new(
-            store,
-            schema,
-            manifest,
-            sst_path_gen,
-            parquet_reader,
-            write_props,
-        );
+    async fn recv_task_loop(mut task_rx: Receiver<Task>, executor: Executor) {
         while let Some(task) = task_rx.recv().await {
-            let runner = runner.clone();
-            rt.spawn(async move {
-                if let Err(e) = runner.do_compaction(task).await {
-                    warn!("Do compaction failed, err:{e}");
-                }
-            });
+            executor.submit(task);
         }
     }
 
     async fn generate_task_loop(
-        manifest: ManifestRef,
         task_tx: Sender<Task>,
-        segment_duration: Duration,
-        config: SchedulerConfig,
+        picker: Picker,
+        schedule_interval: Duration,
     ) {
-        let schedule_interval = config.schedule_interval;
-        let compactor = TimeWindowCompactionStrategy::new(segment_duration, 
config);
-        // TODO: obtain expire time
-        let expire_time = None;
         loop {
-            let ssts = manifest.all_ssts().await;
-            if let Some(task) = compactor.pick_candidate(ssts, expire_time) {
+            if let Some(task) = picker.pick_candidate().await {
                 if let Err(e) = task_tx.try_send(task) {
                     warn!("Send task failed, err:{e}");
                 }
@@ -159,20 +122,26 @@ impl Scheduler {
 #[derive(Clone)]
 pub struct SchedulerConfig {
     pub schedule_interval: Duration,
-    pub memory_limit: u64,
     pub max_pending_compaction_tasks: usize,
-    pub compaction_files_limit: usize,
+    // Runner config
+    pub memory_limit: u64,
     pub write_props: WriterProperties,
+    // Picker config
+    pub ttl: Option<Duration>,
+    pub new_sst_max_size: u64,
+    pub input_sst_max_num: usize,
 }
 
 impl Default for SchedulerConfig {
     fn default() -> Self {
         Self {
             schedule_interval: Duration::from_secs(30),
-            memory_limit: bytesize::gb(2_u64),
             max_pending_compaction_tasks: 10,
-            compaction_files_limit: 10,
+            memory_limit: bytesize::gb(3_u64),
             write_props: WriterProperties::default(),
+            ttl: None,
+            new_sst_max_size: bytesize::gb(1_u64),
+            input_sst_max_num: 10,
         }
     }
 }
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs
index 26580cb3..c9e12359 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/lib.rs
@@ -29,5 +29,6 @@ pub mod storage;
 #[cfg(test)]
 mod test_util;
 pub mod types;
+pub(crate) mod util;
 
 pub use error::{AnyhowError, Error, Result};
diff --git a/src/metric_engine/src/manifest.rs 
b/src/metric_engine/src/manifest.rs
index 1a50b90e..17b188d6 100644
--- a/src/metric_engine/src/manifest.rs
+++ b/src/metric_engine/src/manifest.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod encoding;
 use std::{
-    collections::HashSet,
     io::{Cursor, Write},
     sync::{
         atomic::{AtomicUsize, Ordering},
@@ -29,16 +29,14 @@ use anyhow::Context;
 use async_scoped::TokioScope;
 use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
 use bytes::Bytes;
+pub use encoding::ManifestUpdate;
 use futures::{StreamExt, TryStreamExt};
 use object_store::{path::Path, PutPayload};
 use parquet::data_type::AsBytes;
 use prost::Message;
-use tokio::{
-    runtime::Runtime,
-    sync::{
-        mpsc::{self, Receiver, Sender},
-        RwLock,
-    },
+use tokio::sync::{
+    mpsc::{self, Receiver, Sender},
+    RwLock,
 };
 use tracing::error;
 use uuid::Uuid;
@@ -53,89 +51,31 @@ use crate::{
 pub const PREFIX_PATH: &str = "manifest";
 pub const SNAPSHOT_FILENAME: &str = "snapshot";
 pub const DELTA_PREFIX: &str = "delta";
-pub const TOMBSTONE_PREFIX: &str = "tombstone";
 
 pub type ManifestRef = Arc<Manifest>;
 
 pub struct Manifest {
     delta_dir: Path,
-    tombstone_dir: Path,
     store: ObjectStoreRef,
     merger: Arc<ManifestMerger>,
 
-    payload: RwLock<Payload>,
-}
-
-#[derive(Default)]
-pub struct Payload {
-    files: Vec<SstFile>,
-}
-
-impl Payload {
-    // TODO: we could sort sst files by name asc, then the dedup will be more
-    // efficient
-    pub fn dedup_files(&mut self) {
-        let mut seen = HashSet::with_capacity(self.files.len());
-        self.files.retain(|file| seen.insert(file.id()));
-    }
-}
-
-impl TryFrom<Bytes> for Payload {
-    type Error = Error;
-
-    fn try_from(value: Bytes) -> Result<Self> {
-        if value.is_empty() {
-            Ok(Self::default())
-        } else {
-            let snapshot = Snapshot::try_from(value)?;
-            let files = snapshot.to_sstfiles()?;
-            Ok(Self { files })
-        }
-    }
-}
-
-impl TryFrom<pb_types::Manifest> for Payload {
-    type Error = Error;
-
-    fn try_from(value: pb_types::Manifest) -> Result<Self> {
-        let files = value
-            .files
-            .into_iter()
-            .map(SstFile::try_from)
-            .collect::<Result<Vec<_>>>()?;
-
-        Ok(Self { files })
-    }
-}
-
-impl From<Payload> for pb_types::Manifest {
-    fn from(value: Payload) -> Self {
-        pb_types::Manifest {
-            files: value
-                .files
-                .into_iter()
-                .map(pb_types::SstFile::from)
-                .collect(),
-        }
-    }
+    ssts: RwLock<Vec<SstFile>>,
 }
 
 impl Manifest {
     pub async fn try_new(
         root_dir: String,
         store: ObjectStoreRef,
-        runtime: Arc<Runtime>,
+        runtime: RuntimeRef,
         merge_options: ManifestMergeOptions,
     ) -> Result<Self> {
         let snapshot_path = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
         let delta_dir = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
-        let tombstone_dir = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{TOMBSTONE_PREFIX}"));
 
         let merger = ManifestMerger::try_new(
             snapshot_path.clone(),
             delta_dir.clone(),
             store.clone(),
-            runtime.clone(),
             merge_options,
         )
         .await?;
@@ -148,42 +88,46 @@ impl Manifest {
             });
         }
 
-        let bytes = read_object(&store, &snapshot_path).await?;
-        // TODO: add upgrade logic
-        let payload = bytes.try_into()?;
+        let snapshot = read_snapshot(&store, &snapshot_path).await?;
+        let ssts = snapshot.into_ssts();
 
         Ok(Self {
             delta_dir,
-            tombstone_dir,
             store,
             merger,
-            payload: RwLock::new(payload),
+            ssts: RwLock::new(ssts),
         })
     }
 
     pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
-        self.merger.maybe_schedule_merge().await?;
-
-        let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir));
-        let new_sst = SstFile::new(id, meta);
+        let update = ManifestUpdate::new(vec![SstFile::new(id, meta)], 
Vec::new());
+        self.update(update).await
+    }
 
-        let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
-        let mut buf: Vec<u8> = 
Vec::with_capacity(new_sst_payload.encoded_len());
-        new_sst_payload
+    pub async fn update(&self, update: ManifestUpdate) -> Result<()> {
+        self.merger.maybe_schedule_merge().await?;
+        let path = Path::from(format!("{}/{}", self.delta_dir, 
Uuid::new_v4()));
+        let pb_update = pb_types::ManifestUpdate::from(update.clone());
+        let mut buf: Vec<u8> = Vec::with_capacity(pb_update.encoded_len());
+        pb_update
             .encode(&mut buf)
-            .context("failed to encode manifest file")?;
-        let put_payload = PutPayload::from_bytes(Bytes::from(buf));
+            .context("failed to encode manifest update")?;
 
         // 1. Persist the delta manifest
         self.store
-            .put(&new_sst_path, put_payload)
+            .put(&path, PutPayload::from_bytes(Bytes::from(buf)))
             .await
-            .with_context(|| format!("Failed to write delta manifest, 
path:{}", new_sst_path))?;
+            .with_context(|| format!("Failed to write delta manifest, 
path:{}", path))?;
 
         // 2. Update cached payload
         {
-            let mut payload = self.payload.write().await;
-            payload.files.push(new_sst);
+            let mut ssts = self.ssts.write().await;
+            for file in update.to_adds {
+                ssts.push(file);
+            }
+            // TODO: sort files in payload, so we can delete files more
+            // efficiently.
+            ssts.retain(|file| !update.to_deletes.contains(&file.id()));
         }
 
         // 3. Update delta files num
@@ -192,27 +136,16 @@ impl Manifest {
         Ok(())
     }
 
-    // TODO: recovery tombstone files when startup
-    pub async fn add_tombstone_files<I>(&self, files: I) -> Result<()>
-    where
-        I: IntoIterator<Item = SstFile>,
-    {
-        let path = Path::from(format!("{}/{}", self.tombstone_dir, 
Uuid::new_v4()));
-        todo!()
-    }
-
     // TODO: avoid clone
     pub async fn all_ssts(&self) -> Vec<SstFile> {
-        let payload = self.payload.read().await;
-        payload.files.clone()
+        let ssts = self.ssts.read().await;
+        ssts.clone()
     }
 
     pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
-        let payload = self.payload.read().await;
+        let ssts = self.ssts.read().await;
 
-        payload
-            .files
-            .iter()
+        ssts.iter()
             .filter(move |f| f.meta().time_range.overlaps(time_range))
             .cloned()
             .collect()
@@ -231,6 +164,7 @@ impl Manifest {
 /// - The length field (u64) represents the total length of the subsequent
 ///   records and serves as a straightforward method for verifying their
 ///   integrity. (length = record_length * record_count)
+#[derive(Debug)]
 struct SnapshotHeader {
     pub magic: u32,
     pub version: u8,
@@ -279,13 +213,10 @@ impl SnapshotHeader {
         }
     }
 
-    pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> {
-        ensure!(
-            writer.len() >= SnapshotHeader::LENGTH,
-            "writer buf is too small for writing the header, length: {}",
-            writer.len()
-        );
-
+    pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+    where
+        W: Write,
+    {
         writer
             .write_u32::<LittleEndian>(self.magic)
             .context("write shall not fail.")?;
@@ -319,13 +250,10 @@ impl SnapshotRecordV1 {
     const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num 
rows*/;
     pub const VERSION: u8 = 1;
 
-    pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> {
-        ensure!(
-            writer.len() >= SnapshotRecordV1::LENGTH,
-            "writer buf is too small for writing the record, length: {}",
-            writer.len()
-        );
-
+    pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+    where
+        W: Write,
+    {
         writer
             .write_u64::<LittleEndian>(self.id)
             .context("write shall not fail.")?;
@@ -399,7 +327,7 @@ impl From<SnapshotRecordV1> for SstFile {
 
 struct Snapshot {
     header: SnapshotHeader,
-    inner: Bytes,
+    records: Vec<SnapshotRecordV1>,
 }
 
 impl Default for Snapshot {
@@ -408,7 +336,7 @@ impl Default for Snapshot {
         let header = SnapshotHeader::new(0);
         Self {
             header,
-            inner: Bytes::new(),
+            records: Vec::new(),
         }
     }
 }
@@ -421,82 +349,57 @@ impl TryFrom<Bytes> for Snapshot {
             return Ok(Snapshot::default());
         }
         let header = SnapshotHeader::try_from(bytes.as_bytes())?;
-        let header_length = header.length as usize;
+        let record_total_length = header.length as usize;
         ensure!(
-            header_length > 0
-                && header_length % SnapshotRecordV1::LENGTH == 0
-                && header_length + SnapshotHeader::LENGTH == bytes.len(),
-            "create snapshot from bytes failed, invalid bytes, header length = 
{}, total length: {}",
-                header_length,
-                bytes.len()
+            record_total_length > 0
+                && record_total_length % SnapshotRecordV1::LENGTH == 0
+                && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
+            "create snapshot from bytes failed, header:{header:?}, 
bytes_length: {}",
+            bytes.len()
         );
+        let mut index = SnapshotHeader::LENGTH;
+        let mut records = Vec::with_capacity(record_total_length / 
SnapshotRecordV1::LENGTH);
+        while index < bytes.len() {
+            let record =
+                SnapshotRecordV1::try_from(&bytes[index..index + 
SnapshotRecordV1::LENGTH])?;
+            records.push(record);
+            index += SnapshotRecordV1::LENGTH;
+        }
 
-        Ok(Self {
-            header,
-            inner: bytes,
-        })
+        Ok(Self { header, records })
     }
 }
 
 impl Snapshot {
-    pub fn to_sstfiles(&self) -> Result<Vec<SstFile>> {
+    pub fn into_ssts(self) -> Vec<SstFile> {
         if self.header.length == 0 {
-            Ok(Vec::new())
+            Vec::new()
         } else {
-            let buf = self.inner.as_bytes();
-            let mut result: Vec<SstFile> =
-                Vec::with_capacity(self.header.length as usize / 
SnapshotRecordV1::LENGTH);
-            let mut index = SnapshotHeader::LENGTH;
-            while index < buf.len() {
-                let record =
-                    SnapshotRecordV1::try_from(&buf[index..index + 
SnapshotRecordV1::LENGTH])?;
-                index += SnapshotRecordV1::LENGTH;
-                result.push(record.into());
-            }
-
-            Ok(result)
+            self.records.into_iter().map(|r| r.into()).collect()
         }
     }
 
-    pub fn dedup_sstfiles(&self, sstfiles: &mut Vec<SstFile>) -> Result<()> {
-        let buf = self.inner.as_bytes();
-        let mut ids = HashSet::new();
-        let mut index = SnapshotHeader::LENGTH;
-        while index < buf.len() {
-            let record = SnapshotRecordV1::try_from(&buf[index..index + 
SnapshotRecordV1::LENGTH])?;
-            index += SnapshotRecordV1::LENGTH;
-            ids.insert(record.id());
-        }
-        sstfiles.retain(|item| !ids.contains(&item.id()));
+    // TODO: Ensure no files duplicated
+    // https://github.com/apache/horaedb/issues/1608
+    pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> {
+        self.records
+            .extend(update.to_adds.into_iter().map(SnapshotRecordV1::from));
+        self.records
+            .retain(|record| !update.to_deletes.contains(&record.id));
 
+        self.header.length = (self.records.len() * SnapshotRecordV1::LENGTH) 
as u64;
         Ok(())
     }
 
-    pub fn merge_sstfiles(&mut self, sstfiles: Vec<SstFile>) {
-        // update header
-        self.header.length += (sstfiles.len() * SnapshotRecordV1::LENGTH) as 
u64;
-        // final snapshot
-        let mut snapshot = vec![0u8; SnapshotHeader::LENGTH + 
self.header.length as usize];
-        let mut writer = snapshot.as_mut_slice();
-
-        // write new head
-        self.header.write_to(&mut writer).unwrap();
-        // write old records
-        if !self.inner.is_empty() {
-            writer
-                .write_all(&self.inner.as_bytes()[SnapshotHeader::LENGTH..])
-                .unwrap();
-        }
-        // write new records
-        for sst in sstfiles {
-            let record: SnapshotRecordV1 = sst.into();
-            record.write_to(&mut writer).unwrap();
-        }
-        self.inner = Bytes::from(snapshot);
-    }
+    pub fn into_bytes(self) -> Result<Bytes> {
+        let buf = Vec::with_capacity(self.header.length as usize + 
SnapshotHeader::LENGTH);
+        let mut cursor = Cursor::new(buf);
 
-    pub fn into_bytes(self) -> Bytes {
-        self.inner
+        self.header.write_to(&mut cursor)?;
+        for record in self.records {
+            record.write_to(&mut cursor).unwrap();
+        }
+        Ok(Bytes::from(cursor.into_inner()))
     }
 }
 
@@ -509,7 +412,6 @@ struct ManifestMerger {
     snapshot_path: Path,
     delta_dir: Path,
     store: ObjectStoreRef,
-    runtime: RuntimeRef,
     sender: Sender<MergeType>,
     receiver: RwLock<Receiver<MergeType>>,
     deltas_num: AtomicUsize,
@@ -521,7 +423,6 @@ impl ManifestMerger {
         snapshot_path: Path,
         delta_dir: Path,
         store: ObjectStoreRef,
-        runtime: Arc<Runtime>,
         merge_options: ManifestMergeOptions,
     ) -> Result<Arc<Self>> {
         let (tx, rx) = mpsc::channel(merge_options.channel_size);
@@ -529,7 +430,6 @@ impl ManifestMerger {
             snapshot_path,
             delta_dir,
             store,
-            runtime,
             sender: tx,
             receiver: RwLock::new(rx),
             deltas_num: AtomicUsize::new(0),
@@ -602,17 +502,12 @@ impl ManifestMerger {
             }
         });
 
-        let mut delta_files = Vec::with_capacity(results.len());
+        let mut snapshot = read_snapshot(&self.store, 
&self.snapshot_path).await?;
         for res in results {
-            let sst_file = res.context("Failed to join read delta files 
task")??;
-            delta_files.push(sst_file);
+            let manifest_update = res.context("Failed to join read delta files 
task")??;
+            snapshot.merge_update(manifest_update)?;
         }
-        let snapshot_bytes = read_object(&self.store, 
&self.snapshot_path).await?;
-        let mut snapshot = Snapshot::try_from(snapshot_bytes)?;
-        // TODO: no need to dedup every time.
-        snapshot.dedup_sstfiles(&mut delta_files)?;
-        snapshot.merge_sstfiles(delta_files);
-        let snapshot_bytes = snapshot.into_bytes();
+        let snapshot_bytes = snapshot.into_bytes()?;
         let put_payload = PutPayload::from_bytes(snapshot_bytes);
         // 1. Persist the snapshot
         self.store
@@ -646,38 +541,18 @@ impl ManifestMerger {
     }
 }
 
-async fn read_object(store: &ObjectStoreRef, path: &Path) -> Result<Bytes> {
-    match store.get(path).await {
-        Ok(v) => v
-            .bytes()
-            .await
-            .with_context(|| format!("Failed to read manifest snapshot, 
path:{path}"))
-            .map_err(|e| e.into()),
-        Err(err) => {
-            if err.to_string().contains("not found") {
-                Ok(Bytes::new())
-            } else {
-                let context = format!("Failed to read file, path:{path}");
-                Err(AnyhowError::new(err).context(context).into())
-            }
-        }
-    }
-}
-
-async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result<Payload> 
{
+async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> 
Result<Snapshot> {
     match store.get(path).await {
         Ok(v) => {
             let bytes = v
                 .bytes()
                 .await
                 .with_context(|| format!("Failed to read manifest snapshot, 
path:{path}"))?;
-            let pb_payload = pb_types::Manifest::decode(bytes)
-                .with_context(|| format!("Failed to decode manifest snapshot, 
path:{path}"))?;
-            Payload::try_from(pb_payload)
+            Snapshot::try_from(bytes)
         }
         Err(err) => {
             if err.to_string().contains("not found") {
-                Ok(Payload { files: vec![] })
+                Ok(Snapshot::default())
             } else {
                 let context = format!("Failed to read manifest snapshot, 
path:{path}");
                 Err(AnyhowError::new(err).context(context).into())
@@ -686,7 +561,7 @@ async fn read_snapshot(store: &ObjectStoreRef, path: &Path) 
-> Result<Payload> {
     }
 }
 
-async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) -> 
Result<SstFile> {
+async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) -> 
Result<ManifestUpdate> {
     let bytes = store
         .get(sst_path)
         .await
@@ -695,12 +570,12 @@ async fn read_delta_file(store: &ObjectStoreRef, 
sst_path: &Path) -> Result<SstF
         .await
         .with_context(|| format!("failed to read delta file, 
path:{sst_path}"))?;
 
-    let pb_sst = pb_types::SstFile::decode(bytes)
+    let pb_update = pb_types::ManifestUpdate::decode(bytes)
         .with_context(|| format!("failed to decode delta file, 
path:{sst_path}"))?;
 
-    let sst = SstFile::try_from(pb_sst)
+    let update = ManifestUpdate::try_from(pb_update)
         .with_context(|| format!("failed to convert delta file, 
path:{sst_path}"))?;
-    Ok(sst)
+    Ok(update)
 }
 
 async fn delete_delta_file(store: &ObjectStoreRef, path: &Path) -> Result<()> {
@@ -730,43 +605,30 @@ async fn list_delta_paths(store: &ObjectStoreRef, 
delta_dir: &Path) -> Result<Ve
 mod tests {
     use std::sync::Arc;
 
+    use itertools::Itertools;
     use object_store::local::LocalFileSystem;
     use tokio::time::sleep;
 
     use super::*;
 
-    #[tokio::test]
-    async fn test_find_manifest() {
+    #[test]
+    fn test_find_manifest() {
         let root_dir = temp_dir::TempDir::new().unwrap();
-        let runtime = tokio::runtime::Runtime::new().unwrap();
+        let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
+        let rt = runtime.clone();
         let store = Arc::new(LocalFileSystem::new());
 
-        let manifest = Manifest::try_new(
-            root_dir.path().to_string_lossy().to_string(),
-            store,
-            Arc::new(runtime),
-            ManifestMergeOptions::default(),
-        )
-        .await
-        .unwrap();
-
-        for i in 0..20 {
-            let time_range = (i..i + 1).into();
-            let meta = FileMeta {
-                max_sequence: i as u64,
-                num_rows: i as u32,
-                size: i as u32,
-                time_range,
-            };
-            manifest.add_file(i as u64, meta).await.unwrap();
-        }
-
-        let find_range = (10..15).into();
-        let mut ssts = manifest.find_ssts(&find_range).await;
+        rt.block_on(async move {
+            let manifest = Manifest::try_new(
+                root_dir.path().to_string_lossy().to_string(),
+                store,
+                runtime.clone(),
+                ManifestMergeOptions::default(),
+            )
+            .await
+            .unwrap();
 
-        let mut expected_ssts = (10..15)
-            .map(|i| {
-                let id = i as u64;
+            for i in 0..20 {
                 let time_range = (i..i + 1).into();
                 let meta = FileMeta {
                     max_sequence: i as u64,
@@ -774,17 +636,34 @@ mod tests {
                     size: i as u32,
                     time_range,
                 };
-                SstFile::new(id, meta)
-            })
-            .collect::<Vec<_>>();
+                manifest.add_file(i as u64, meta).await.unwrap();
+            }
 
-        expected_ssts.sort_by_key(|a| a.id());
-        ssts.sort_by_key(|a| a.id());
-        assert_eq!(expected_ssts, ssts);
+            let find_range = (10..15).into();
+            let mut ssts = manifest.find_ssts(&find_range).await;
+
+            let mut expected_ssts = (10..15)
+                .map(|i| {
+                    let id = i as u64;
+                    let time_range = (i..i + 1).into();
+                    let meta = FileMeta {
+                        max_sequence: i as u64,
+                        num_rows: i as u32,
+                        size: i as u32,
+                        time_range,
+                    };
+                    SstFile::new(id, meta)
+                })
+                .collect::<Vec<_>>();
+
+            expected_ssts.sort_by_key(|a| a.id());
+            ssts.sort_by_key(|a| a.id());
+            assert_eq!(expected_ssts, ssts);
+        });
     }
 
-    #[tokio::test]
-    async fn test_merge_manifest() {
+    #[test]
+    fn test_merge_manifest() {
         let root_dir = temp_dir::TempDir::new()
             .unwrap()
             .path()
@@ -792,72 +671,53 @@ mod tests {
             .to_string();
         let snapshot_path = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
         let delta_dir = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
-        let runtime = tokio::runtime::Builder::new_multi_thread()
-            .worker_threads(4)
-            .enable_all()
-            .build()
+        let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
+        let rt = runtime.clone();
+
+        rt.block_on(async move {
+            let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
+            let manifest = Manifest::try_new(
+                root_dir,
+                store.clone(),
+                runtime.clone(),
+                ManifestMergeOptions {
+                    merge_interval_seconds: 1,
+                    ..Default::default()
+                },
+            )
+            .await
             .unwrap();
-        let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
 
-        let manifest = Manifest::try_new(
-            root_dir,
-            store.clone(),
-            Arc::new(runtime),
-            ManifestMergeOptions {
-                merge_interval_seconds: 1,
-                ..Default::default()
-            },
-        )
-        .await
-        .unwrap();
-
-        // Add manifest files
-        for i in 0..20 {
-            let time_range = (i..i + 1).into();
-            let meta = FileMeta {
-                max_sequence: i as u64,
-                num_rows: i as u32,
-                size: i as u32,
-                time_range,
-            };
-            manifest.add_file(i as u64, meta).await.unwrap();
-        }
+            // Add manifest files
+            for i in 0..20 {
+                let time_range = (i..i + 1).into();
+                let meta = FileMeta {
+                    max_sequence: i as u64,
+                    num_rows: i as u32,
+                    size: i as u32,
+                    time_range,
+                };
+                manifest.add_file(i as u64, meta).await.unwrap();
+            }
 
-        // Wait for merge manifest to finish
-        sleep(Duration::from_secs(2)).await;
-
-        let mut mem_ssts = manifest.payload.read().await.files.clone();
-        let snapshot = read_object(&store, &snapshot_path).await.unwrap();
-        let snapshot_len = snapshot.len();
-        let payload: Payload = snapshot.try_into().unwrap();
-        let mut ssts = payload.files;
-
-        mem_ssts.sort_by_key(|a| a.id());
-        ssts.sort_by_key(|a| a.id());
-        assert_eq!(mem_ssts, ssts);
-
-        let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
-        assert!(delta_paths.is_empty());
-
-        // Add manifest files again to verify dedup
-        for i in 0..20 {
-            let time_range = (i..i + 1).into();
-            let meta = FileMeta {
-                max_sequence: i as u64,
-                num_rows: i as u32,
-                size: i as u32,
-                time_range,
-            };
-            manifest.add_file(i as u64, meta).await.unwrap();
-        }
+            // Wait for merge manifest to finish
+            sleep(Duration::from_secs(2)).await;
 
-        // Wait for merge manifest to finish
-        sleep(Duration::from_secs(2)).await;
+            let mut mem_ssts = manifest.ssts.read().await.clone();
+            let snapshot = read_snapshot(&store, 
&snapshot_path).await.unwrap();
+            let mut ssts = snapshot
+                .records
+                .into_iter()
+                .map(SstFile::from)
+                .collect_vec();
+
+            mem_ssts.sort_by_key(|a| a.id());
+            ssts.sort_by_key(|a| a.id());
+            assert_eq!(mem_ssts, ssts);
 
-        let snapshot_again = read_object(&store, 
&snapshot_path).await.unwrap();
-        assert!(snapshot_len == snapshot_again.len()); // dedup took effect.
-        let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
-        assert!(delta_paths.is_empty());
+            let delta_paths = list_delta_paths(&store, 
&delta_dir).await.unwrap();
+            assert!(delta_paths.is_empty());
+        })
     }
 
     #[test]
@@ -885,11 +745,6 @@ mod tests {
             257, // length
             cursor.read_u64::<LittleEndian>().unwrap()
         );
-
-        let mut vec = [0u8; SnapshotHeader::LENGTH - 1];
-        let mut writer = vec.as_mut_slice();
-        let result = header.write_to(&mut writer);
-        assert!(result.is_err()); // buf not enough
     }
 
     #[test]
@@ -931,9 +786,5 @@ mod tests {
             100, // num rows
             cursor.read_u32::<LittleEndian>().unwrap()
         );
-        let mut vec = vec![0u8; SnapshotRecordV1::LENGTH - 1];
-        let mut writer = vec.as_mut_slice();
-        let result = record.write_to(&mut writer);
-        assert!(result.is_err()); // buf not enough
     }
 }
diff --git a/src/metric_engine/src/manifest/encoding.rs 
b/src/metric_engine/src/manifest/encoding.rs
new file mode 100644
index 00000000..01deb7a5
--- /dev/null
+++ b/src/metric_engine/src/manifest/encoding.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+    sst::{FileId, SstFile},
+    Error, Result,
+};
+
+#[derive(Clone, Debug)]
+pub struct ManifestUpdate {
+    pub to_adds: Vec<SstFile>,
+    pub to_deletes: Vec<FileId>,
+}
+
+impl ManifestUpdate {
+    pub fn new(to_adds: Vec<SstFile>, to_deletes: Vec<FileId>) -> Self {
+        Self {
+            to_adds,
+            to_deletes,
+        }
+    }
+}
+
+impl TryFrom<pb_types::ManifestUpdate> for ManifestUpdate {
+    type Error = Error;
+
+    fn try_from(value: pb_types::ManifestUpdate) -> Result<Self> {
+        let to_adds = value
+            .to_adds
+            .into_iter()
+            .map(SstFile::try_from)
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Self {
+            to_adds,
+            to_deletes: value.to_deletes,
+        })
+    }
+}
+
+impl From<ManifestUpdate> for pb_types::ManifestUpdate {
+    fn from(value: ManifestUpdate) -> Self {
+        let to_adds = value
+            .to_adds
+            .into_iter()
+            .map(pb_types::SstFile::from)
+            .collect();
+
+        pb_types::ManifestUpdate {
+            to_adds,
+            to_deletes: value.to_deletes,
+        }
+    }
+}
diff --git a/src/metric_engine/src/sst.rs b/src/metric_engine/src/sst.rs
index 1cbfedec..070a62ed 100644
--- a/src/metric_engine/src/sst.rs
+++ b/src/metric_engine/src/sst.rs
@@ -74,6 +74,10 @@ impl SstFile {
         self.inner.in_compaction.store(true, Ordering::Relaxed);
     }
 
+    pub fn unmark_compaction(&self) {
+        self.inner.in_compaction.store(false, Ordering::Relaxed);
+    }
+
     pub fn is_compaction(&self) -> bool {
         self.inner.in_compaction.load(Ordering::Relaxed)
     }
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index 37cc904e..f15d67ff 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -125,6 +125,7 @@ impl StorageRuntimes {
 ///
 /// Compaction will be done by merging segments within a segment, and segment
 /// will make it easy to support expiration.
+#[allow(dead_code)]
 pub struct CloudObjectStorage {
     segment_duration: Duration,
     path: String,
@@ -150,7 +151,7 @@ pub struct CloudObjectStorage {
 /// ```
 /// `root_path` is composed of `path` and `segment_duration`.
 impl CloudObjectStorage {
-    pub async fn try_new(
+    pub fn try_new(
         path: String,
         segment_duration: Duration,
         store: ObjectStoreRef,
@@ -159,15 +160,16 @@ impl CloudObjectStorage {
         storage_opts: StorageOptions,
     ) -> Result<Self> {
         let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
-        let manifest = Arc::new(
+        let manifest = runtimes.manifest_compact_runtime.block_on(async {
             Manifest::try_new(
                 path.clone(),
                 store.clone(),
                 runtimes.manifest_compact_runtime.clone(),
                 storage_opts.manifest_merge_opts,
             )
-            .await?,
-        );
+            .await
+        })?;
+        let manifest = Arc::new(manifest);
         let schema = {
             let value_idxes = 
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
             ensure!(!value_idxes.is_empty(), "no value column found");
@@ -426,8 +428,8 @@ mod tests {
     use super::*;
     use crate::{arrow_schema, record_batch, test_util::check_stream, 
types::Timestamp};
 
-    #[test(tokio::test)]
-    async fn test_storage_write_and_scan() {
+    #[test(test)]
+    fn test_storage_write_and_scan() {
         let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", 
Int64));
         let root_dir = temp_dir::TempDir::new().unwrap();
         let store = Arc::new(LocalFileSystem::new());
@@ -439,67 +441,68 @@ mod tests {
             2, // num_primary_keys
             StorageOptions::default(),
         )
-        .await
-        .unwrap();
-
-        let batch = record_batch!(
-            ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
-            ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
-            ("value", Int64, vec![2, 7, 4, 6, 1])
-        )
         .unwrap();
-        storage
-            .write(WriteRequest {
-                batch,
-                time_range: (1..10).into(),
-                enable_check: true,
-            })
-            .await
-            .unwrap();
 
-        let batch = record_batch!(
-            ("pk1", UInt8, vec![11, 11, 9, 10]),
-            ("pk2", UInt8, vec![100, 99, 1, 2]),
-            ("value", Int64, vec![22, 77, 44, 66])
-        )
-        .unwrap();
-        storage
-            .write(WriteRequest {
-                batch,
-                time_range: (10..20).into(),
-                enable_check: true,
-            })
-            .await
+        storage.runtimes.sst_compact_runtime.block_on(async {
+            let batch = record_batch!(
+                ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
+                ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
+                ("value", Int64, vec![2, 7, 4, 6, 1])
+            )
             .unwrap();
+            storage
+                .write(WriteRequest {
+                    batch,
+                    time_range: (1..10).into(),
+                    enable_check: true,
+                })
+                .await
+                .unwrap();
 
-        let result_stream = storage
-            .scan(ScanRequest {
-                range: TimeRange::new(Timestamp(0), Timestamp::MAX),
-                predicate: vec![],
-                projections: None,
-            })
-            .await
-            .unwrap();
-        let expected_batch = [
-            record_batch!(
-                ("pk1", UInt8, vec![5, 9, 10, 11]),
-                ("pk2", UInt8, vec![3, 1, 2, 99]),
-                ("value", Int64, vec![1, 44, 66, 77])
-            )
-            .unwrap(),
-            record_batch!(
-                ("pk1", UInt8, vec![11]),
-                ("pk2", UInt8, vec![100]),
-                ("value", Int64, vec![22])
+            let batch = record_batch!(
+                ("pk1", UInt8, vec![11, 11, 9, 10]),
+                ("pk2", UInt8, vec![100, 99, 1, 2]),
+                ("value", Int64, vec![22, 77, 44, 66])
             )
-            .unwrap(),
-        ];
+            .unwrap();
+            storage
+                .write(WriteRequest {
+                    batch,
+                    time_range: (10..20).into(),
+                    enable_check: true,
+                })
+                .await
+                .unwrap();
 
-        check_stream(result_stream, expected_batch).await;
+            let result_stream = storage
+                .scan(ScanRequest {
+                    range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+                    predicate: vec![],
+                    projections: None,
+                })
+                .await
+                .unwrap();
+            let expected_batch = [
+                record_batch!(
+                    ("pk1", UInt8, vec![5, 9, 10, 11]),
+                    ("pk2", UInt8, vec![3, 1, 2, 99]),
+                    ("value", Int64, vec![1, 44, 66, 77])
+                )
+                .unwrap(),
+                record_batch!(
+                    ("pk1", UInt8, vec![11]),
+                    ("pk2", UInt8, vec![100]),
+                    ("value", Int64, vec![22])
+                )
+                .unwrap(),
+            ];
+
+            check_stream(result_stream, expected_batch).await;
+        });
     }
 
-    #[tokio::test]
-    async fn test_storage_sort_batch() {
+    #[test]
+    fn test_storage_sort_batch() {
         let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8), 
("c", UInt8));
         let root_dir = temp_dir::TempDir::new().unwrap();
         let store = Arc::new(LocalFileSystem::new());
@@ -511,32 +514,32 @@ mod tests {
             1,
             StorageOptions::default(),
         )
-        .await
-        .unwrap();
-
-        let batch = record_batch!(
-            ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
-            ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
-            ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]),
-            ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8])
-        )
         .unwrap();
+        storage.runtimes.sst_compact_runtime.block_on(async {
+            let batch = record_batch!(
+                ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
+                ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
+                ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]),
+                ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8])
+            )
+            .unwrap();
 
-        let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
-        let expected_bacth = record_batch!(
-            ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]),
-            ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]),
-            ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]),
-            ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1])
-        )
-        .unwrap();
-        let mut offset = 0;
-        while let Some(sorted_batch) = sorted_batches.next().await {
-            let sorted_batch = sorted_batch.unwrap();
-            let length = sorted_batch.num_rows();
-            let batch = expected_bacth.slice(offset, length);
-            assert_eq!(sorted_batch, batch);
-            offset += length;
-        }
+            let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
+            let expected_bacth = record_batch!(
+                ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]),
+                ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]),
+                ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]),
+                ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1])
+            )
+            .unwrap();
+            let mut offset = 0;
+            while let Some(sorted_batch) = sorted_batches.next().await {
+                let sorted_batch = sorted_batch.unwrap();
+                let length = sorted_batch.num_rows();
+                let batch = expected_bacth.slice(offset, length);
+                assert_eq!(sorted_batch, batch);
+                offset += length;
+            }
+        });
     }
 }
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/util.rs
similarity index 75%
copy from src/metric_engine/src/lib.rs
copy to src/metric_engine/src/util.rs
index 26580cb3..f48e8834 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/util.rs
@@ -15,19 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Storage Engine for metrics.
+use std::time::{SystemTime, UNIX_EPOCH};
 
-#![feature(duration_constructors)]
-mod compaction;
-pub mod error;
-mod macros;
-mod manifest;
-pub mod operator;
-mod read;
-mod sst;
-pub mod storage;
-#[cfg(test)]
-mod test_util;
-pub mod types;
-
-pub use error::{AnyhowError, Error, Result};
+/// Current time in milliseconds.
+pub fn now() -> i64 {
+    let now = SystemTime::now();
+    let duration = now.duration_since(UNIX_EPOCH).unwrap();
+    duration.as_millis() as i64
+}
diff --git a/src/pb_types/protos/sst.proto b/src/pb_types/protos/sst.proto
index 5312ffa7..3c5d9040 100644
--- a/src/pb_types/protos/sst.proto
+++ b/src/pb_types/protos/sst.proto
@@ -41,11 +41,7 @@ message SstFile {
   SstMeta meta = 2;
 }
 
-message Manifest {
-  repeated SstFile files = 1;
-}
-
-message MetaUpdate {
+message ManifestUpdate {
   repeated SstFile to_adds = 1;
-  repeated uint64 to_removes = 2;
+  repeated uint64 to_deletes = 2;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to