This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cdf2e66 feat: init compact scheduler (#1601)
6cdf2e66 is described below

commit 6cdf2e66a04f11c60c04cacae0111a14d344a501
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Nov 29 16:41:23 2024 +0800

    feat: init compact scheduler (#1601)
    
    ## Rationale
    Setup the basic structure for compaction
    
    ## Detailed Changes
    
    
    ## Test Plan
    Old CI, not scheduler has no tests now.
---
 horaedb/Cargo.lock                                 |   7 +
 horaedb/Cargo.toml                                 |   1 +
 horaedb/metric_engine/Cargo.toml                   |   1 +
 .../src/{lib.rs => compaction/mod.rs}              |  26 ++--
 .../src/{lib.rs => compaction/picker.rs}           |  26 ++--
 horaedb/metric_engine/src/compaction/scheduler.rs  | 166 +++++++++++++++++++++
 horaedb/metric_engine/src/lib.rs                   |   1 +
 horaedb/metric_engine/src/manifest.rs              |  36 +++--
 horaedb/metric_engine/src/sst.rs                   |  77 ++++++++--
 horaedb/metric_engine/src/storage.rs               |  93 +++++++-----
 horaedb/metric_engine/src/types.rs                 |   9 +-
 11 files changed, 360 insertions(+), 83 deletions(-)

diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 1d68e849..f2f8003b 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -469,6 +469,12 @@ version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
 
+[[package]]
+name = "bytesize"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
+
 [[package]]
 name = "bzip2"
 version = "0.4.4"
@@ -1569,6 +1575,7 @@ dependencies = [
  "async-scoped",
  "async-trait",
  "bytes",
+ "bytesize",
  "datafusion",
  "futures",
  "itertools 0.3.25",
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index d40bdafd..863c789f 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -37,6 +37,7 @@ macros = { path = "../src/components/macros" }
 pb_types = { path = "pb_types" }
 prost = { version = "0.13" }
 arrow = { version = "53", features = ["prettyprint"] }
+bytesize = "1"
 arrow-schema = "53"
 tokio = { version = "1", features = ["full"] }
 async-trait = "0.1"
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index 7235fc68..d0e31a04 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -37,6 +37,7 @@ arrow-schema = { workspace = true }
 async-scoped = { workspace = true }
 async-trait = { workspace = true }
 bytes = { workspace = true }
+bytesize = { workspace = true }
 datafusion = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
diff --git a/horaedb/metric_engine/src/lib.rs 
b/horaedb/metric_engine/src/compaction/mod.rs
similarity index 70%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/compaction/mod.rs
index e0dfdb3c..a661c48b 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/compaction/mod.rs
@@ -15,16 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Storage Engine for metrics.
+mod picker;
+mod scheduler;
 
-#![feature(duration_constructors)]
-pub mod error;
-mod macros;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-mod test_util;
-pub mod types;
+pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
 
-pub use error::{AnyhowError, Error, Result};
+use crate::sst::{FileId, SstFile};
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Input {
+    files: Vec<SstFile>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Task {
+    pub inputs: Vec<FileId>,
+    pub expireds: Vec<FileId>,
+}
diff --git a/horaedb/metric_engine/src/lib.rs 
b/horaedb/metric_engine/src/compaction/picker.rs
similarity index 68%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/compaction/picker.rs
index e0dfdb3c..9e41acf8 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/compaction/picker.rs
@@ -15,16 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Storage Engine for metrics.
+use std::time::Duration;
 
-#![feature(duration_constructors)]
-pub mod error;
-mod macros;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-mod test_util;
-pub mod types;
+use crate::{compaction::Task, sst::SstFile};
 
-pub use error::{AnyhowError, Error, Result};
+pub struct TimeWindowCompactionStrategy {
+    segment_duration: Duration,
+}
+
+impl TimeWindowCompactionStrategy {
+    pub fn new(segment_duration: Duration) -> Self {
+        Self { segment_duration }
+    }
+
+    pub fn pick_candidate(&self, _ssts: Vec<SstFile>) -> Option<Task> {
+        todo!()
+    }
+}
diff --git a/horaedb/metric_engine/src/compaction/scheduler.rs 
b/horaedb/metric_engine/src/compaction/scheduler.rs
new file mode 100644
index 00000000..4d311e49
--- /dev/null
+++ b/horaedb/metric_engine/src/compaction/scheduler.rs
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    sync::{atomic::AtomicU64, Arc},
+    time::Duration,
+};
+
+use anyhow::Context;
+use tokio::{
+    sync::mpsc::{self, Receiver, Sender},
+    task::JoinHandle,
+    time::sleep,
+};
+use tracing::warn;
+
+use crate::{
+    compaction::{picker::TimeWindowCompactionStrategy, Task},
+    manifest::ManifestRef,
+    sst::SstPathGenerator,
+    types::{ObjectStoreRef, RuntimeRef},
+    Result,
+};
+
+pub struct Scheduler {
+    runtime: RuntimeRef,
+
+    task_tx: Sender<Task>,
+    inused_memory: AtomicU64,
+    task_handle: JoinHandle<()>,
+    picker_handle: JoinHandle<()>,
+}
+
+impl Scheduler {
+    pub fn new(
+        runtime: RuntimeRef,
+        manifest: ManifestRef,
+        store: ObjectStoreRef,
+        segment_duration: Duration,
+        sst_path_gen: Arc<SstPathGenerator>,
+        config: SchedulerConfig,
+    ) -> Self {
+        let (task_tx, task_rx) = 
mpsc::channel(config.max_pending_compaction_tasks);
+        let task_handle = {
+            let rt = runtime.clone();
+            let store = store.clone();
+            let manifest = manifest.clone();
+            runtime.spawn(async move {
+                Self::recv_task_loop(
+                    rt,
+                    task_rx,
+                    store,
+                    manifest,
+                    sst_path_gen,
+                    config.memory_limit,
+                )
+                .await;
+            })
+        };
+        let picker_handle = {
+            let task_tx = task_tx.clone();
+            let interval = config.schedule_interval;
+            runtime.spawn(async move {
+                Self::generate_task_loop(manifest, task_tx, interval, 
segment_duration).await;
+            })
+        };
+
+        Self {
+            runtime,
+            task_tx,
+            task_handle,
+            picker_handle,
+            inused_memory: AtomicU64::new(0),
+        }
+    }
+
+    pub fn try_send(&self, task: Task) -> Result<()> {
+        self.task_tx
+            .try_send(task)
+            .context("failed to send task to scheduler")?;
+
+        Ok(())
+    }
+
+    async fn recv_task_loop(
+        rt: RuntimeRef,
+        mut task_rx: Receiver<Task>,
+        store: ObjectStoreRef,
+        manifest: ManifestRef,
+        _sst_path_gen: Arc<SstPathGenerator>,
+        _mem_limit: u64,
+    ) {
+        while let Some(task) = task_rx.recv().await {
+            let store = store.clone();
+            let manifest = manifest.clone();
+            rt.spawn(async move {
+                let runner = Runner { store, manifest };
+                if let Err(e) = runner.do_compaction(task).await {
+                    warn!("Do compaction failed, err:{e}");
+                }
+            });
+        }
+    }
+
+    async fn generate_task_loop(
+        manifest: ManifestRef,
+        task_tx: Sender<Task>,
+        schedule_interval: Duration,
+        segment_duration: Duration,
+    ) {
+        let compactor = TimeWindowCompactionStrategy::new(segment_duration);
+        loop {
+            let ssts = manifest.all_ssts().await;
+            if let Some(task) = compactor.pick_candidate(ssts) {
+                if let Err(e) = task_tx.try_send(task) {
+                    warn!("Send task failed, err:{e}");
+                }
+            }
+
+            sleep(schedule_interval).await;
+        }
+    }
+}
+
+pub struct SchedulerConfig {
+    pub schedule_interval: Duration,
+    pub memory_limit: u64,
+    pub max_pending_compaction_tasks: usize,
+}
+
+impl Default for SchedulerConfig {
+    fn default() -> Self {
+        Self {
+            schedule_interval: Duration::from_secs(30),
+            memory_limit: bytesize::gb(2_u64),
+            max_pending_compaction_tasks: 10,
+        }
+    }
+}
+
+pub struct Runner {
+    store: ObjectStoreRef,
+    manifest: ManifestRef,
+}
+
+impl Runner {
+    // TODO: Merge input sst files into one new sst file
+    // and delete the expired sst files
+    async fn do_compaction(&self, _task: Task) -> Result<()> {
+        todo!()
+    }
+}
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index e0dfdb3c..c77cdbb3 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -18,6 +18,7 @@
 //! Storage Engine for metrics.
 
 #![feature(duration_constructors)]
+mod compaction;
 pub mod error;
 mod macros;
 mod manifest;
diff --git a/horaedb/metric_engine/src/manifest.rs 
b/horaedb/metric_engine/src/manifest.rs
index e435e699..37646348 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -41,7 +41,7 @@ use tracing::error;
 
 use crate::{
     sst::{FileId, FileMeta, SstFile},
-    types::{ManifestMergeOptions, ObjectStoreRef, TimeRange},
+    types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange},
     AnyhowError, Error, Result,
 };
 
@@ -49,6 +49,8 @@ pub const PREFIX_PATH: &str = "manifest";
 pub const SNAPSHOT_FILENAME: &str = "snapshot";
 pub const DELTA_PREFIX: &str = "delta";
 
+pub type ManifestRef = Arc<Manifest>;
+
 pub struct Manifest {
     delta_dir: Path,
     store: ObjectStoreRef,
@@ -66,7 +68,7 @@ impl Payload {
     // efficient
     pub fn dedup_files(&mut self) {
         let mut seen = HashSet::with_capacity(self.files.len());
-        self.files.retain(|file| seen.insert(file.id));
+        self.files.retain(|file| seen.insert(file.id()));
     }
 }
 
@@ -103,8 +105,8 @@ impl Manifest {
         runtime: Arc<Runtime>,
         merge_options: ManifestMergeOptions,
     ) -> Result<Self> {
-        let snapshot_path = 
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
-        let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+        let snapshot_path = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
+        let delta_dir = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
 
         let merger = ManifestMerger::try_new(
             snapshot_path.clone(),
@@ -137,7 +139,7 @@ impl Manifest {
         self.merger.maybe_schedule_merge().await?;
 
         let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir));
-        let new_sst = SstFile { id, meta };
+        let new_sst = SstFile::new(id, meta);
 
         let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
         let mut buf: Vec<u8> = 
Vec::with_capacity(new_sst_payload.encoded_len());
@@ -164,13 +166,19 @@ impl Manifest {
         Ok(())
     }
 
+    // TODO: avoid clone
+    pub async fn all_ssts(&self) -> Vec<SstFile> {
+        let payload = self.payload.read().await;
+        payload.files.clone()
+    }
+
     pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
         let payload = self.payload.read().await;
 
         payload
             .files
             .iter()
-            .filter(move |f| f.meta.time_range.overlaps(time_range))
+            .filter(move |f| f.meta().time_range.overlaps(time_range))
             .cloned()
             .collect()
     }
@@ -185,7 +193,7 @@ struct ManifestMerger {
     snapshot_path: Path,
     delta_dir: Path,
     store: ObjectStoreRef,
-    runtime: Arc<Runtime>,
+    runtime: RuntimeRef,
     sender: Sender<MergeType>,
     receiver: RwLock<Receiver<MergeType>>,
     deltas_num: AtomicUsize,
@@ -442,12 +450,12 @@ mod tests {
                     size: i as u32,
                     time_range,
                 };
-                SstFile { id, meta }
+                SstFile::new(id, meta)
             })
             .collect::<Vec<_>>();
 
-        expected_ssts.sort_by(|a, b| a.id.cmp(&b.id));
-        ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        expected_ssts.sort_by_key(|a| a.id());
+        ssts.sort_by_key(|a| a.id());
         assert_eq!(expected_ssts, ssts);
     }
 
@@ -458,8 +466,8 @@ mod tests {
             .path()
             .to_string_lossy()
             .to_string();
-        let snapshot_path = 
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
-        let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+        let snapshot_path = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
+        let delta_dir = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
         let runtime = tokio::runtime::Builder::new_multi_thread()
             .worker_threads(4)
             .enable_all()
@@ -497,8 +505,8 @@ mod tests {
         let mut mem_ssts = manifest.payload.read().await.files.clone();
         let mut ssts = read_snapshot(&store, 
&snapshot_path).await.unwrap().files;
 
-        mem_ssts.sort_by(|a, b| a.id.cmp(&b.id));
-        ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        mem_ssts.sort_by_key(|a| a.id());
+        ssts.sort_by_key(|a| a.id());
         assert_eq!(mem_ssts, ssts);
 
         let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs
index 644988f8..56c3a225 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/metric_engine/src/sst.rs
@@ -17,8 +17,8 @@
 
 use std::{
     sync::{
-        atomic::{AtomicU64, Ordering},
-        LazyLock,
+        atomic::{AtomicBool, AtomicU64, Ordering},
+        Arc, LazyLock,
     },
     time::SystemTime,
 };
@@ -27,14 +27,50 @@ use macros::ensure;
 
 use crate::{types::TimeRange, Error};
 
-pub const PREFIX_PATH: &str = "data";
+const PREFIX_PATH: &str = "data";
 
 pub type FileId = u64;
 
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug)]
 pub struct SstFile {
-    pub id: FileId,
-    pub meta: FileMeta,
+    inner: Arc<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+    id: FileId,
+    meta: FileMeta,
+
+    in_compaction: AtomicBool,
+}
+
+impl Inner {
+    pub fn new(id: FileId, meta: FileMeta) -> Self {
+        Self {
+            id,
+            meta,
+            in_compaction: AtomicBool::new(false),
+        }
+    }
+}
+
+impl SstFile {
+    pub fn new(id: FileId, meta: FileMeta) -> Self {
+        let inner = Arc::new(Inner::new(id, meta));
+        Self { inner }
+    }
+
+    pub fn id(&self) -> FileId {
+        self.inner.id
+    }
+
+    pub fn meta(&self) -> &FileMeta {
+        &self.inner.meta
+    }
+
+    pub fn mark_compaction(&self) {
+        self.inner.in_compaction.store(true, Ordering::Relaxed);
+    }
 }
 
 impl TryFrom<pb_types::SstFile> for SstFile {
@@ -45,19 +81,27 @@ impl TryFrom<pb_types::SstFile> for SstFile {
         let meta = value.meta.unwrap();
         let meta = meta.try_into()?;
 
-        Ok(Self { id: value.id, meta })
+        Ok(Self::new(value.id, meta))
     }
 }
 
 impl From<SstFile> for pb_types::SstFile {
     fn from(value: SstFile) -> Self {
         pb_types::SstFile {
-            id: value.id,
-            meta: Some(value.meta.into()),
+            id: value.id(),
+            meta: Some(value.meta().clone().into()),
         }
     }
 }
 
+impl PartialEq for SstFile {
+    fn eq(&self, other: &Self) -> bool {
+        self.id() == other.id() && self.meta() == self.meta()
+    }
+}
+
+impl Eq for SstFile {}
+
 #[derive(Clone, Debug, PartialEq, Eq)]
 pub struct FileMeta {
     pub max_sequence: u64,
@@ -112,3 +156,18 @@ static NEXT_ID: LazyLock<AtomicU64> = LazyLock::new(|| {
 pub fn allocate_id() -> u64 {
     NEXT_ID.fetch_add(1, Ordering::SeqCst)
 }
+
+#[derive(Debug, Clone)]
+pub struct SstPathGenerator {
+    prefix: String,
+}
+
+impl SstPathGenerator {
+    pub fn new(prefix: String) -> Self {
+        Self { prefix }
+    }
+
+    pub fn generate(&self, id: FileId) -> String {
+        format!("{}/{}/{}.sst", self.prefix, PREFIX_PATH, id)
+    }
+}
diff --git a/horaedb/metric_engine/src/storage.rs 
b/horaedb/metric_engine/src/storage.rs
index ac37e9c8..e30cb2a8 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -57,9 +57,10 @@ use parquet::{
 use tokio::runtime::Runtime;
 
 use crate::{
-    manifest::Manifest,
+    compaction::{CompactionScheduler, SchedulerConfig},
+    manifest::{Manifest, ManifestRef},
     read::{DefaultParquetFileReaderFactory, MergeExec},
-    sst::{allocate_id, FileId, FileMeta, SstFile},
+    sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
     types::{
         ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, 
WriteOptions, WriteResult,
         SEQ_COLUMN_NAME,
@@ -97,21 +98,31 @@ pub trait TimeMergeStorage {
     async fn compact(&self, req: CompactRequest) -> Result<()>;
 }
 
+#[derive(Clone)]
 struct StorageRuntimes {
-    compact_runtime: Arc<Runtime>,
+    manifest_compact_runtime: Arc<Runtime>,
+    sst_compact_runtime: Arc<Runtime>,
 }
 
 impl StorageRuntimes {
     pub fn new(runtime_opts: RuntimeOptions) -> Result<Self> {
-        let compact_runtime = tokio::runtime::Builder::new_multi_thread()
-            .thread_name("storage-compact")
-            .worker_threads(runtime_opts.compact_thread_num)
+        let manifest_compact_runtime = 
tokio::runtime::Builder::new_multi_thread()
+            .thread_name("man-compact")
+            .worker_threads(runtime_opts.manifest_compact_thread_num)
+            .enable_all()
+            .build()
+            .context("build storgae compact runtime")?;
+
+        let sst_compact_runtime = tokio::runtime::Builder::new_multi_thread()
+            .thread_name("sst-compact")
+            .worker_threads(runtime_opts.sst_compact_thread_num)
             .enable_all()
             .build()
             .context("build storgae compact runtime")?;
 
         Ok(Self {
-            compact_runtime: Arc::new(compact_runtime),
+            manifest_compact_runtime: Arc::new(manifest_compact_runtime),
+            sst_compact_runtime: Arc::new(sst_compact_runtime),
         })
     }
 }
@@ -127,11 +138,13 @@ pub struct CloudObjectStorage {
     store: ObjectStoreRef,
     arrow_schema: SchemaRef,
     num_primary_keys: usize,
-    manifest: Manifest,
+    manifest: ManifestRef,
     runtimes: StorageRuntimes,
 
     df_schema: DFSchema,
     write_props: WriterProperties,
+    sst_path_gen: Arc<SstPathGenerator>,
+    compact_scheduler: CompactionScheduler,
 }
 
 /// It will organize the data in the following way:
@@ -154,16 +167,17 @@ impl CloudObjectStorage {
         num_primary_keys: usize,
         storage_opts: StorageOptions,
     ) -> Result<Self> {
-        let manifest_prefix = crate::manifest::PREFIX_PATH;
         let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
 
-        let manifest = Manifest::try_new(
-            format!("{path}/{manifest_prefix}"),
-            store.clone(),
-            runtimes.compact_runtime.clone(),
-            storage_opts.manifest_merge_opts,
-        )
-        .await?;
+        let manifest = Arc::new(
+            Manifest::try_new(
+                path.clone(),
+                store.clone(),
+                runtimes.manifest_compact_runtime.clone(),
+                storage_opts.manifest_merge_opts,
+            )
+            .await?,
+        );
         let mut new_fields = arrow_schema.fields.clone().to_vec();
         new_fields.push(Arc::new(Field::new(
             SEQ_COLUMN_NAME,
@@ -176,6 +190,15 @@ impl CloudObjectStorage {
         ));
         let df_schema = 
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
         let write_props = Self::build_write_props(storage_opts.write_opts, 
num_primary_keys);
+        let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone()));
+        let compact_scheduler = CompactionScheduler::new(
+            runtimes.sst_compact_runtime.clone(),
+            manifest.clone(),
+            store.clone(),
+            segment_duration,
+            sst_path_gen.clone(),
+            SchedulerConfig::default(),
+        );
         Ok(Self {
             path,
             num_primary_keys,
@@ -186,18 +209,14 @@ impl CloudObjectStorage {
             runtimes,
             df_schema,
             write_props,
+            sst_path_gen,
+            compact_scheduler,
         })
     }
 
-    fn build_file_path(&self, id: FileId) -> String {
-        let root = &self.path;
-        let prefix = crate::sst::PREFIX_PATH;
-        format!("{root}/{prefix}/{id}")
-    }
-
     async fn write_batch(&self, batch: RecordBatch) -> Result<WriteResult> {
         let file_id = allocate_id();
-        let file_path = self.build_file_path(file_id);
+        let file_path = self.sst_path_gen.generate(file_id);
         let file_path = Path::from(file_path);
         let object_store_writer = ParquetObjectWriter::new(self.store.clone(), 
file_path.clone());
         let mut writer = AsyncArrowWriter::try_new(
@@ -324,8 +343,8 @@ impl CloudObjectStorage {
             .into_iter()
             .map(|f| {
                 vec![PartitionedFile::new(
-                    self.build_file_path(f.id),
-                    f.meta.size as u64,
+                    self.sst_path_gen.generate(f.id()),
+                    f.meta().size as u64,
                 )]
             })
             .collect::<Vec<_>>();
@@ -405,7 +424,7 @@ impl TimeMergeStorage for CloudObjectStorage {
         }
 
         let ssts_by_segment = total_ssts.into_iter().group_by(|file| {
-            file.meta.time_range.start.0 / self.segment_duration.as_millis() 
as i64
+            file.meta().time_range.start.0 / self.segment_duration.as_millis() 
as i64
         });
 
         let mut plan_for_all_segments = Vec::new();
@@ -429,7 +448,7 @@ impl TimeMergeStorage for CloudObjectStorage {
         return Ok(res);
     }
 
-    async fn compact(&self, req: CompactRequest) -> Result<()> {
+    async fn compact(&self, _req: CompactRequest) -> Result<()> {
         todo!()
     }
 }
@@ -460,14 +479,16 @@ mod tests {
         let plan = storage
             .build_scan_plan(
                 (100..103)
-                    .map(|id| SstFile {
-                        id,
-                        meta: FileMeta {
-                            max_sequence: id,
-                            num_rows: 1,
-                            size: 1,
-                            time_range: (1..10).into(),
-                        },
+                    .map(|id| {
+                        SstFile::new(
+                            id,
+                            FileMeta {
+                                max_sequence: id,
+                                num_rows: 1,
+                                size: 1,
+                                time_range: (1..10).into(),
+                            },
+                        )
                     })
                     .collect(),
                 None,
@@ -481,7 +502,7 @@ mod tests {
         assert_eq!(
             r#"MergeExec: [primary_keys: 1, seq_idx: 1]
   SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024
-    ParquetExec: file_groups={3 groups: [[mock/data/100], [mock/data/101], 
[mock/data/102]]}, projection=[pk1, __seq__], output_orderings=[[pk1@0 ASC, 
__seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC]]
+    ParquetExec: file_groups={3 groups: [[mock/data/100.sst], 
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, __seq__], 
output_orderings=[[pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], 
[pk1@0 ASC, __seq__@1 ASC]]
 "#,
             format!("{display_plan}")
         );
diff --git a/horaedb/metric_engine/src/types.rs 
b/horaedb/metric_engine/src/types.rs
index 98ba9764..8e7c994e 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -23,6 +23,7 @@ use std::{
 
 use object_store::ObjectStore;
 use parquet::basic::{Compression, Encoding, ZstdLevel};
+use tokio::runtime::Runtime;
 
 use crate::sst::FileId;
 
@@ -30,6 +31,8 @@ use crate::sst::FileId;
 // user-defined schema.
 pub const SEQ_COLUMN_NAME: &str = "__seq__";
 
+pub type RuntimeRef = Arc<Runtime>;
+
 #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
 pub struct Timestamp(pub i64);
 
@@ -148,13 +151,15 @@ impl Default for WriteOptions {
 }
 
 pub struct RuntimeOptions {
-    pub compact_thread_num: usize,
+    pub manifest_compact_thread_num: usize,
+    pub sst_compact_thread_num: usize,
 }
 
 impl Default for RuntimeOptions {
     fn default() -> Self {
         Self {
-            compact_thread_num: 4,
+            manifest_compact_thread_num: 2,
+            sst_compact_thread_num: 4,
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to