This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 6cdf2e66 feat: init compact scheduler (#1601)
6cdf2e66 is described below
commit 6cdf2e66a04f11c60c04cacae0111a14d344a501
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Nov 29 16:41:23 2024 +0800
feat: init compact scheduler (#1601)
## Rationale
Setup the basic structure for compaction
## Detailed Changes
## Test Plan
Old CI, not scheduler has no tests now.
---
horaedb/Cargo.lock | 7 +
horaedb/Cargo.toml | 1 +
horaedb/metric_engine/Cargo.toml | 1 +
.../src/{lib.rs => compaction/mod.rs} | 26 ++--
.../src/{lib.rs => compaction/picker.rs} | 26 ++--
horaedb/metric_engine/src/compaction/scheduler.rs | 166 +++++++++++++++++++++
horaedb/metric_engine/src/lib.rs | 1 +
horaedb/metric_engine/src/manifest.rs | 36 +++--
horaedb/metric_engine/src/sst.rs | 77 ++++++++--
horaedb/metric_engine/src/storage.rs | 93 +++++++-----
horaedb/metric_engine/src/types.rs | 9 +-
11 files changed, 360 insertions(+), 83 deletions(-)
diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 1d68e849..f2f8003b 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -469,6 +469,12 @@ version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
+[[package]]
+name = "bytesize"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
+
[[package]]
name = "bzip2"
version = "0.4.4"
@@ -1569,6 +1575,7 @@ dependencies = [
"async-scoped",
"async-trait",
"bytes",
+ "bytesize",
"datafusion",
"futures",
"itertools 0.3.25",
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index d40bdafd..863c789f 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -37,6 +37,7 @@ macros = { path = "../src/components/macros" }
pb_types = { path = "pb_types" }
prost = { version = "0.13" }
arrow = { version = "53", features = ["prettyprint"] }
+bytesize = "1"
arrow-schema = "53"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index 7235fc68..d0e31a04 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -37,6 +37,7 @@ arrow-schema = { workspace = true }
async-scoped = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
+bytesize = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
diff --git a/horaedb/metric_engine/src/lib.rs
b/horaedb/metric_engine/src/compaction/mod.rs
similarity index 70%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/compaction/mod.rs
index e0dfdb3c..a661c48b 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/compaction/mod.rs
@@ -15,16 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-//! Storage Engine for metrics.
+mod picker;
+mod scheduler;
-#![feature(duration_constructors)]
-pub mod error;
-mod macros;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-mod test_util;
-pub mod types;
+pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
-pub use error::{AnyhowError, Error, Result};
+use crate::sst::{FileId, SstFile};
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Input {
+ files: Vec<SstFile>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Task {
+ pub inputs: Vec<FileId>,
+ pub expireds: Vec<FileId>,
+}
diff --git a/horaedb/metric_engine/src/lib.rs
b/horaedb/metric_engine/src/compaction/picker.rs
similarity index 68%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/compaction/picker.rs
index e0dfdb3c..9e41acf8 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/compaction/picker.rs
@@ -15,16 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-//! Storage Engine for metrics.
+use std::time::Duration;
-#![feature(duration_constructors)]
-pub mod error;
-mod macros;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-mod test_util;
-pub mod types;
+use crate::{compaction::Task, sst::SstFile};
-pub use error::{AnyhowError, Error, Result};
+pub struct TimeWindowCompactionStrategy {
+ segment_duration: Duration,
+}
+
+impl TimeWindowCompactionStrategy {
+ pub fn new(segment_duration: Duration) -> Self {
+ Self { segment_duration }
+ }
+
+ pub fn pick_candidate(&self, _ssts: Vec<SstFile>) -> Option<Task> {
+ todo!()
+ }
+}
diff --git a/horaedb/metric_engine/src/compaction/scheduler.rs
b/horaedb/metric_engine/src/compaction/scheduler.rs
new file mode 100644
index 00000000..4d311e49
--- /dev/null
+++ b/horaedb/metric_engine/src/compaction/scheduler.rs
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ sync::{atomic::AtomicU64, Arc},
+ time::Duration,
+};
+
+use anyhow::Context;
+use tokio::{
+ sync::mpsc::{self, Receiver, Sender},
+ task::JoinHandle,
+ time::sleep,
+};
+use tracing::warn;
+
+use crate::{
+ compaction::{picker::TimeWindowCompactionStrategy, Task},
+ manifest::ManifestRef,
+ sst::SstPathGenerator,
+ types::{ObjectStoreRef, RuntimeRef},
+ Result,
+};
+
+pub struct Scheduler {
+ runtime: RuntimeRef,
+
+ task_tx: Sender<Task>,
+ inused_memory: AtomicU64,
+ task_handle: JoinHandle<()>,
+ picker_handle: JoinHandle<()>,
+}
+
+impl Scheduler {
+ pub fn new(
+ runtime: RuntimeRef,
+ manifest: ManifestRef,
+ store: ObjectStoreRef,
+ segment_duration: Duration,
+ sst_path_gen: Arc<SstPathGenerator>,
+ config: SchedulerConfig,
+ ) -> Self {
+ let (task_tx, task_rx) =
mpsc::channel(config.max_pending_compaction_tasks);
+ let task_handle = {
+ let rt = runtime.clone();
+ let store = store.clone();
+ let manifest = manifest.clone();
+ runtime.spawn(async move {
+ Self::recv_task_loop(
+ rt,
+ task_rx,
+ store,
+ manifest,
+ sst_path_gen,
+ config.memory_limit,
+ )
+ .await;
+ })
+ };
+ let picker_handle = {
+ let task_tx = task_tx.clone();
+ let interval = config.schedule_interval;
+ runtime.spawn(async move {
+ Self::generate_task_loop(manifest, task_tx, interval,
segment_duration).await;
+ })
+ };
+
+ Self {
+ runtime,
+ task_tx,
+ task_handle,
+ picker_handle,
+ inused_memory: AtomicU64::new(0),
+ }
+ }
+
+ pub fn try_send(&self, task: Task) -> Result<()> {
+ self.task_tx
+ .try_send(task)
+ .context("failed to send task to scheduler")?;
+
+ Ok(())
+ }
+
+ async fn recv_task_loop(
+ rt: RuntimeRef,
+ mut task_rx: Receiver<Task>,
+ store: ObjectStoreRef,
+ manifest: ManifestRef,
+ _sst_path_gen: Arc<SstPathGenerator>,
+ _mem_limit: u64,
+ ) {
+ while let Some(task) = task_rx.recv().await {
+ let store = store.clone();
+ let manifest = manifest.clone();
+ rt.spawn(async move {
+ let runner = Runner { store, manifest };
+ if let Err(e) = runner.do_compaction(task).await {
+ warn!("Do compaction failed, err:{e}");
+ }
+ });
+ }
+ }
+
+ async fn generate_task_loop(
+ manifest: ManifestRef,
+ task_tx: Sender<Task>,
+ schedule_interval: Duration,
+ segment_duration: Duration,
+ ) {
+ let compactor = TimeWindowCompactionStrategy::new(segment_duration);
+ loop {
+ let ssts = manifest.all_ssts().await;
+ if let Some(task) = compactor.pick_candidate(ssts) {
+ if let Err(e) = task_tx.try_send(task) {
+ warn!("Send task failed, err:{e}");
+ }
+ }
+
+ sleep(schedule_interval).await;
+ }
+ }
+}
+
+pub struct SchedulerConfig {
+ pub schedule_interval: Duration,
+ pub memory_limit: u64,
+ pub max_pending_compaction_tasks: usize,
+}
+
+impl Default for SchedulerConfig {
+ fn default() -> Self {
+ Self {
+ schedule_interval: Duration::from_secs(30),
+ memory_limit: bytesize::gb(2_u64),
+ max_pending_compaction_tasks: 10,
+ }
+ }
+}
+
+pub struct Runner {
+ store: ObjectStoreRef,
+ manifest: ManifestRef,
+}
+
+impl Runner {
+ // TODO: Merge input sst files into one new sst file
+ // and delete the expired sst files
+ async fn do_compaction(&self, _task: Task) -> Result<()> {
+ todo!()
+ }
+}
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index e0dfdb3c..c77cdbb3 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -18,6 +18,7 @@
//! Storage Engine for metrics.
#![feature(duration_constructors)]
+mod compaction;
pub mod error;
mod macros;
mod manifest;
diff --git a/horaedb/metric_engine/src/manifest.rs
b/horaedb/metric_engine/src/manifest.rs
index e435e699..37646348 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -41,7 +41,7 @@ use tracing::error;
use crate::{
sst::{FileId, FileMeta, SstFile},
- types::{ManifestMergeOptions, ObjectStoreRef, TimeRange},
+ types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange},
AnyhowError, Error, Result,
};
@@ -49,6 +49,8 @@ pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
pub const DELTA_PREFIX: &str = "delta";
+pub type ManifestRef = Arc<Manifest>;
+
pub struct Manifest {
delta_dir: Path,
store: ObjectStoreRef,
@@ -66,7 +68,7 @@ impl Payload {
// efficient
pub fn dedup_files(&mut self) {
let mut seen = HashSet::with_capacity(self.files.len());
- self.files.retain(|file| seen.insert(file.id));
+ self.files.retain(|file| seen.insert(file.id()));
}
}
@@ -103,8 +105,8 @@ impl Manifest {
runtime: Arc<Runtime>,
merge_options: ManifestMergeOptions,
) -> Result<Self> {
- let snapshot_path =
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
- let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+ let snapshot_path =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
+ let delta_dir =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
let merger = ManifestMerger::try_new(
snapshot_path.clone(),
@@ -137,7 +139,7 @@ impl Manifest {
self.merger.maybe_schedule_merge().await?;
let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir));
- let new_sst = SstFile { id, meta };
+ let new_sst = SstFile::new(id, meta);
let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
let mut buf: Vec<u8> =
Vec::with_capacity(new_sst_payload.encoded_len());
@@ -164,13 +166,19 @@ impl Manifest {
Ok(())
}
+ // TODO: avoid clone
+ pub async fn all_ssts(&self) -> Vec<SstFile> {
+ let payload = self.payload.read().await;
+ payload.files.clone()
+ }
+
pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
let payload = self.payload.read().await;
payload
.files
.iter()
- .filter(move |f| f.meta.time_range.overlaps(time_range))
+ .filter(move |f| f.meta().time_range.overlaps(time_range))
.cloned()
.collect()
}
@@ -185,7 +193,7 @@ struct ManifestMerger {
snapshot_path: Path,
delta_dir: Path,
store: ObjectStoreRef,
- runtime: Arc<Runtime>,
+ runtime: RuntimeRef,
sender: Sender<MergeType>,
receiver: RwLock<Receiver<MergeType>>,
deltas_num: AtomicUsize,
@@ -442,12 +450,12 @@ mod tests {
size: i as u32,
time_range,
};
- SstFile { id, meta }
+ SstFile::new(id, meta)
})
.collect::<Vec<_>>();
- expected_ssts.sort_by(|a, b| a.id.cmp(&b.id));
- ssts.sort_by(|a, b| a.id.cmp(&b.id));
+ expected_ssts.sort_by_key(|a| a.id());
+ ssts.sort_by_key(|a| a.id());
assert_eq!(expected_ssts, ssts);
}
@@ -458,8 +466,8 @@ mod tests {
.path()
.to_string_lossy()
.to_string();
- let snapshot_path =
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
- let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+ let snapshot_path =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
+ let delta_dir =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
@@ -497,8 +505,8 @@ mod tests {
let mut mem_ssts = manifest.payload.read().await.files.clone();
let mut ssts = read_snapshot(&store,
&snapshot_path).await.unwrap().files;
- mem_ssts.sort_by(|a, b| a.id.cmp(&b.id));
- ssts.sort_by(|a, b| a.id.cmp(&b.id));
+ mem_ssts.sort_by_key(|a| a.id());
+ ssts.sort_by_key(|a| a.id());
assert_eq!(mem_ssts, ssts);
let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs
index 644988f8..56c3a225 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/metric_engine/src/sst.rs
@@ -17,8 +17,8 @@
use std::{
sync::{
- atomic::{AtomicU64, Ordering},
- LazyLock,
+ atomic::{AtomicBool, AtomicU64, Ordering},
+ Arc, LazyLock,
},
time::SystemTime,
};
@@ -27,14 +27,50 @@ use macros::ensure;
use crate::{types::TimeRange, Error};
-pub const PREFIX_PATH: &str = "data";
+const PREFIX_PATH: &str = "data";
pub type FileId = u64;
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug)]
pub struct SstFile {
- pub id: FileId,
- pub meta: FileMeta,
+ inner: Arc<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+ id: FileId,
+ meta: FileMeta,
+
+ in_compaction: AtomicBool,
+}
+
+impl Inner {
+ pub fn new(id: FileId, meta: FileMeta) -> Self {
+ Self {
+ id,
+ meta,
+ in_compaction: AtomicBool::new(false),
+ }
+ }
+}
+
+impl SstFile {
+ pub fn new(id: FileId, meta: FileMeta) -> Self {
+ let inner = Arc::new(Inner::new(id, meta));
+ Self { inner }
+ }
+
+ pub fn id(&self) -> FileId {
+ self.inner.id
+ }
+
+ pub fn meta(&self) -> &FileMeta {
+ &self.inner.meta
+ }
+
+ pub fn mark_compaction(&self) {
+ self.inner.in_compaction.store(true, Ordering::Relaxed);
+ }
}
impl TryFrom<pb_types::SstFile> for SstFile {
@@ -45,19 +81,27 @@ impl TryFrom<pb_types::SstFile> for SstFile {
let meta = value.meta.unwrap();
let meta = meta.try_into()?;
- Ok(Self { id: value.id, meta })
+ Ok(Self::new(value.id, meta))
}
}
impl From<SstFile> for pb_types::SstFile {
fn from(value: SstFile) -> Self {
pb_types::SstFile {
- id: value.id,
- meta: Some(value.meta.into()),
+ id: value.id(),
+ meta: Some(value.meta().clone().into()),
}
}
}
+impl PartialEq for SstFile {
+ fn eq(&self, other: &Self) -> bool {
+ self.id() == other.id() && self.meta() == self.meta()
+ }
+}
+
+impl Eq for SstFile {}
+
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FileMeta {
pub max_sequence: u64,
@@ -112,3 +156,18 @@ static NEXT_ID: LazyLock<AtomicU64> = LazyLock::new(|| {
pub fn allocate_id() -> u64 {
NEXT_ID.fetch_add(1, Ordering::SeqCst)
}
+
+#[derive(Debug, Clone)]
+pub struct SstPathGenerator {
+ prefix: String,
+}
+
+impl SstPathGenerator {
+ pub fn new(prefix: String) -> Self {
+ Self { prefix }
+ }
+
+ pub fn generate(&self, id: FileId) -> String {
+ format!("{}/{}/{}.sst", self.prefix, PREFIX_PATH, id)
+ }
+}
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index ac37e9c8..e30cb2a8 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -57,9 +57,10 @@ use parquet::{
use tokio::runtime::Runtime;
use crate::{
- manifest::Manifest,
+ compaction::{CompactionScheduler, SchedulerConfig},
+ manifest::{Manifest, ManifestRef},
read::{DefaultParquetFileReaderFactory, MergeExec},
- sst::{allocate_id, FileId, FileMeta, SstFile},
+ sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
types::{
ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange,
WriteOptions, WriteResult,
SEQ_COLUMN_NAME,
@@ -97,21 +98,31 @@ pub trait TimeMergeStorage {
async fn compact(&self, req: CompactRequest) -> Result<()>;
}
+#[derive(Clone)]
struct StorageRuntimes {
- compact_runtime: Arc<Runtime>,
+ manifest_compact_runtime: Arc<Runtime>,
+ sst_compact_runtime: Arc<Runtime>,
}
impl StorageRuntimes {
pub fn new(runtime_opts: RuntimeOptions) -> Result<Self> {
- let compact_runtime = tokio::runtime::Builder::new_multi_thread()
- .thread_name("storage-compact")
- .worker_threads(runtime_opts.compact_thread_num)
+ let manifest_compact_runtime =
tokio::runtime::Builder::new_multi_thread()
+ .thread_name("man-compact")
+ .worker_threads(runtime_opts.manifest_compact_thread_num)
+ .enable_all()
+ .build()
+ .context("build storgae compact runtime")?;
+
+ let sst_compact_runtime = tokio::runtime::Builder::new_multi_thread()
+ .thread_name("sst-compact")
+ .worker_threads(runtime_opts.sst_compact_thread_num)
.enable_all()
.build()
.context("build storgae compact runtime")?;
Ok(Self {
- compact_runtime: Arc::new(compact_runtime),
+ manifest_compact_runtime: Arc::new(manifest_compact_runtime),
+ sst_compact_runtime: Arc::new(sst_compact_runtime),
})
}
}
@@ -127,11 +138,13 @@ pub struct CloudObjectStorage {
store: ObjectStoreRef,
arrow_schema: SchemaRef,
num_primary_keys: usize,
- manifest: Manifest,
+ manifest: ManifestRef,
runtimes: StorageRuntimes,
df_schema: DFSchema,
write_props: WriterProperties,
+ sst_path_gen: Arc<SstPathGenerator>,
+ compact_scheduler: CompactionScheduler,
}
/// It will organize the data in the following way:
@@ -154,16 +167,17 @@ impl CloudObjectStorage {
num_primary_keys: usize,
storage_opts: StorageOptions,
) -> Result<Self> {
- let manifest_prefix = crate::manifest::PREFIX_PATH;
let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
- let manifest = Manifest::try_new(
- format!("{path}/{manifest_prefix}"),
- store.clone(),
- runtimes.compact_runtime.clone(),
- storage_opts.manifest_merge_opts,
- )
- .await?;
+ let manifest = Arc::new(
+ Manifest::try_new(
+ path.clone(),
+ store.clone(),
+ runtimes.manifest_compact_runtime.clone(),
+ storage_opts.manifest_merge_opts,
+ )
+ .await?,
+ );
let mut new_fields = arrow_schema.fields.clone().to_vec();
new_fields.push(Arc::new(Field::new(
SEQ_COLUMN_NAME,
@@ -176,6 +190,15 @@ impl CloudObjectStorage {
));
let df_schema =
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
let write_props = Self::build_write_props(storage_opts.write_opts,
num_primary_keys);
+ let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone()));
+ let compact_scheduler = CompactionScheduler::new(
+ runtimes.sst_compact_runtime.clone(),
+ manifest.clone(),
+ store.clone(),
+ segment_duration,
+ sst_path_gen.clone(),
+ SchedulerConfig::default(),
+ );
Ok(Self {
path,
num_primary_keys,
@@ -186,18 +209,14 @@ impl CloudObjectStorage {
runtimes,
df_schema,
write_props,
+ sst_path_gen,
+ compact_scheduler,
})
}
- fn build_file_path(&self, id: FileId) -> String {
- let root = &self.path;
- let prefix = crate::sst::PREFIX_PATH;
- format!("{root}/{prefix}/{id}")
- }
-
async fn write_batch(&self, batch: RecordBatch) -> Result<WriteResult> {
let file_id = allocate_id();
- let file_path = self.build_file_path(file_id);
+ let file_path = self.sst_path_gen.generate(file_id);
let file_path = Path::from(file_path);
let object_store_writer = ParquetObjectWriter::new(self.store.clone(),
file_path.clone());
let mut writer = AsyncArrowWriter::try_new(
@@ -324,8 +343,8 @@ impl CloudObjectStorage {
.into_iter()
.map(|f| {
vec![PartitionedFile::new(
- self.build_file_path(f.id),
- f.meta.size as u64,
+ self.sst_path_gen.generate(f.id()),
+ f.meta().size as u64,
)]
})
.collect::<Vec<_>>();
@@ -405,7 +424,7 @@ impl TimeMergeStorage for CloudObjectStorage {
}
let ssts_by_segment = total_ssts.into_iter().group_by(|file| {
- file.meta.time_range.start.0 / self.segment_duration.as_millis()
as i64
+ file.meta().time_range.start.0 / self.segment_duration.as_millis()
as i64
});
let mut plan_for_all_segments = Vec::new();
@@ -429,7 +448,7 @@ impl TimeMergeStorage for CloudObjectStorage {
return Ok(res);
}
- async fn compact(&self, req: CompactRequest) -> Result<()> {
+ async fn compact(&self, _req: CompactRequest) -> Result<()> {
todo!()
}
}
@@ -460,14 +479,16 @@ mod tests {
let plan = storage
.build_scan_plan(
(100..103)
- .map(|id| SstFile {
- id,
- meta: FileMeta {
- max_sequence: id,
- num_rows: 1,
- size: 1,
- time_range: (1..10).into(),
- },
+ .map(|id| {
+ SstFile::new(
+ id,
+ FileMeta {
+ max_sequence: id,
+ num_rows: 1,
+ size: 1,
+ time_range: (1..10).into(),
+ },
+ )
})
.collect(),
None,
@@ -481,7 +502,7 @@ mod tests {
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 1]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024
- ParquetExec: file_groups={3 groups: [[mock/data/100], [mock/data/101],
[mock/data/102]]}, projection=[pk1, __seq__], output_orderings=[[pk1@0 ASC,
__seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC]]
+ ParquetExec: file_groups={3 groups: [[mock/data/100.sst],
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, __seq__],
output_orderings=[[pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC],
[pk1@0 ASC, __seq__@1 ASC]]
"#,
format!("{display_plan}")
);
diff --git a/horaedb/metric_engine/src/types.rs
b/horaedb/metric_engine/src/types.rs
index 98ba9764..8e7c994e 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -23,6 +23,7 @@ use std::{
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
+use tokio::runtime::Runtime;
use crate::sst::FileId;
@@ -30,6 +31,8 @@ use crate::sst::FileId;
// user-defined schema.
pub const SEQ_COLUMN_NAME: &str = "__seq__";
+pub type RuntimeRef = Arc<Runtime>;
+
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Timestamp(pub i64);
@@ -148,13 +151,15 @@ impl Default for WriteOptions {
}
pub struct RuntimeOptions {
- pub compact_thread_num: usize,
+ pub manifest_compact_thread_num: usize,
+ pub sst_compact_thread_num: usize,
}
impl Default for RuntimeOptions {
fn default() -> Self {
Self {
- compact_thread_num: 4,
+ manifest_compact_thread_num: 2,
+ sst_compact_thread_num: 4,
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]