This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 1331e0a3 feat: add compaction runner (#1609)
1331e0a3 is described below
commit 1331e0a3d114d770abec071af7229b5281da6abc
Author: Jiacai Liu <[email protected]>
AuthorDate: Tue Dec 17 11:06:16 2024 +0800
feat: add compaction runner (#1609)
## Rationale
Compaction runner is responsible for compact old sst & delete expired
sst.
## Detailed Changes
## Test Plan
CI
---
Cargo.lock | 14 ++
Cargo.toml | 1 +
Makefile | 2 +-
src/metric_engine/Cargo.toml | 1 +
src/metric_engine/src/compaction/mod.rs | 1 +
src/metric_engine/src/compaction/runner.rs | 172 ++++++++++++++++++++
src/metric_engine/src/compaction/scheduler.rs | 47 +++---
src/metric_engine/src/manifest.rs | 14 ++
src/metric_engine/src/read.rs | 178 +++++++++++++++++++--
src/metric_engine/src/storage.rs | 216 ++++++++------------------
src/metric_engine/src/types.rs | 17 +-
11 files changed, 475 insertions(+), 188 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index d484dea6..2c660fc3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1677,6 +1677,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
+ "uuid",
]
[[package]]
@@ -2738,6 +2739,19 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
dependencies = [
"getrandom",
+ "rand",
+ "uuid-macro-internal",
+]
+
+[[package]]
+name = "uuid-macro-internal"
+version = "1.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6b91f57fe13a38d0ce9e28a03463d8d3c2468ed03d75375110ec71d93b449a08"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 4be3a049..4d121b50 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,6 +50,7 @@ tracing = "0.1"
tracing-subscriber = "0.3"
async-scoped = { version = "0.9.0", features = ["use-tokio"] }
test-log = "0.2"
+uuid = { version = "1" }
# This profile optimizes for good runtime performance.
[profile.release]
diff --git a/Makefile b/Makefile
index 915073d5..b39aeb18 100644
--- a/Makefile
+++ b/Makefile
@@ -48,7 +48,7 @@ udeps:
clippy:
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D
warnings -D clippy::dbg-macro \
- -A dead_code -A unused_variables -A clippy::unreachable # Remove these
once we have a clean build
+ -A dead_code -A unused_variables -A clippy::unreachable -A
clippy::too_many_arguments # Remove these once we have a clean build
ensure-disk-quota:
bash ./scripts/free-disk-space.sh
diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml
index e84c4087..ca8dc576 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/metric_engine/Cargo.toml
@@ -50,6 +50,7 @@ prost = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
+uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
[dev-dependencies]
temp-dir = { workspace = true }
diff --git a/src/metric_engine/src/compaction/mod.rs
b/src/metric_engine/src/compaction/mod.rs
index a2037db5..62beda0a 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/compaction/mod.rs
@@ -16,6 +16,7 @@
// under the License.
mod picker;
+mod runner;
mod scheduler;
pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
diff --git a/src/metric_engine/src/compaction/runner.rs
b/src/metric_engine/src/compaction/runner.rs
new file mode 100644
index 00000000..ce6f170e
--- /dev/null
+++ b/src/metric_engine/src/compaction/runner.rs
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use anyhow::Context;
+use arrow::array::{RecordBatch, UInt64Array};
+use async_scoped::TokioScope;
+use datafusion::{execution::TaskContext, physical_plan::execute_stream};
+use futures::StreamExt;
+use object_store::path::Path;
+use parquet::{
+ arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
+ file::properties::WriterProperties,
+};
+use tracing::error;
+
+use crate::{
+ compaction::Task,
+ manifest::ManifestRef,
+ read::ParquetReader,
+ sst::{allocate_id, FileMeta, SstPathGenerator},
+ types::{ObjectStoreRef, StorageSchema},
+ Result,
+};
+
+#[derive(Clone)]
+pub struct Runner {
+ store: ObjectStoreRef,
+ schema: StorageSchema,
+ manifest: ManifestRef,
+ sst_path_gen: Arc<SstPathGenerator>,
+ parquet_reader: Arc<ParquetReader>,
+ write_props: WriterProperties,
+}
+
+impl Runner {
+ pub fn new(
+ store: ObjectStoreRef,
+ schema: StorageSchema,
+ manifest: ManifestRef,
+ sst_path_gen: Arc<SstPathGenerator>,
+ parquet_reader: Arc<ParquetReader>,
+ write_props: WriterProperties,
+ ) -> Self {
+ Self {
+ store,
+ schema,
+ manifest,
+ sst_path_gen,
+ parquet_reader,
+ write_props,
+ }
+ }
+
+ // TODO: Merge input sst files into one new sst file
+ // and delete the expired sst files
+ pub async fn do_compaction(&self, task: Task) -> Result<()> {
+ assert!(!task.inputs.is_empty());
+ for f in &task.inputs {
+ assert!(f.is_compaction());
+ }
+ for f in &task.expireds {
+ assert!(f.is_compaction());
+ }
+ let mut time_range = task.inputs[0].meta().time_range.clone();
+ for f in &task.inputs[1..] {
+ time_range.merge(&f.meta().time_range);
+ }
+ let plan = self
+ .parquet_reader
+ .build_df_plan(task.inputs.clone(), None, Vec::new())?;
+ let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
+ .context("execute datafusion plan")?;
+
+ let file_id = allocate_id();
+ let file_path = self.sst_path_gen.generate(file_id);
+ let file_path = Path::from(file_path);
+ let object_store_writer = ParquetObjectWriter::new(self.store.clone(),
file_path.clone());
+ let mut writer = AsyncArrowWriter::try_new(
+ object_store_writer,
+ self.schema.arrow_schema.clone(),
+ Some(self.write_props.clone()),
+ )
+ .context("create arrow writer")?;
+ let mut num_rows = 0;
+ // TODO: support multi-part write
+ while let Some(batch) = stream.next().await {
+ let batch = batch.context("execute plan")?;
+ num_rows += batch.num_rows();
+ let batch_with_seq = {
+ let mut new_cols = batch.columns().to_vec();
+ // Since file_id in increasing order, we can use it as
sequence.
+ let seq_column = Arc::new(UInt64Array::from(vec![file_id;
batch.num_rows()]));
+ new_cols.push(seq_column);
+ RecordBatch::try_new(self.schema.arrow_schema.clone(),
new_cols)
+ .context("construct record batch with seq column")?
+ };
+
+ writer.write(&batch_with_seq).await.context("write batch")?;
+ }
+ writer.close().await.context("close writer")?;
+ let object_meta = self
+ .store
+ .head(&file_path)
+ .await
+ .context("get object meta")?;
+ let file_meta = FileMeta {
+ max_sequence: file_id,
+ num_rows: num_rows as u32,
+ size: object_meta.size as u32,
+ time_range: time_range.clone(),
+ };
+ // First add new sst to manifest, then delete expired/old sst
+ self.manifest.add_file(file_id, file_meta).await?;
+ self.manifest
+ .add_tombstone_files(task.expireds.clone())
+ .await?;
+ self.manifest
+ .add_tombstone_files(task.inputs.clone())
+ .await?;
+
+ let (_, results) = TokioScope::scope_and_block(|scope| {
+ for file in task.expireds {
+ let path = Path::from(self.sst_path_gen.generate(file.id()));
+ scope.spawn(async move {
+ self.store
+ .delete(&path)
+ .await
+ .with_context(|| format!("failed to delete file,
path:{path}"))
+ });
+ }
+ for file in task.inputs {
+ let path = Path::from(self.sst_path_gen.generate(file.id()));
+ scope.spawn(async move {
+ self.store
+ .delete(&path)
+ .await
+ .with_context(|| format!("failed to delete file,
path:{path}"))
+ });
+ }
+ });
+ for res in results {
+ match res {
+ Err(e) => {
+ error!("Failed to join delete task, err:{e}")
+ }
+ Ok(v) => {
+ if let Err(e) = v {
+ error!("Failed to delete sst, err:{e}")
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/src/metric_engine/src/compaction/scheduler.rs
b/src/metric_engine/src/compaction/scheduler.rs
index 97a16ce2..5e2c407a 100644
--- a/src/metric_engine/src/compaction/scheduler.rs
+++ b/src/metric_engine/src/compaction/scheduler.rs
@@ -21,6 +21,7 @@ use std::{
};
use anyhow::Context;
+use parquet::file::properties::WriterProperties;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
@@ -28,11 +29,13 @@ use tokio::{
};
use tracing::warn;
+use super::runner::Runner;
use crate::{
compaction::{picker::TimeWindowCompactionStrategy, Task},
manifest::ManifestRef,
+ read::ParquetReader,
sst::SstPathGenerator,
- types::{ObjectStoreRef, RuntimeRef},
+ types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};
@@ -50,8 +53,10 @@ impl Scheduler {
runtime: RuntimeRef,
manifest: ManifestRef,
store: ObjectStoreRef,
+ schema: StorageSchema,
segment_duration: Duration,
sst_path_gen: Arc<SstPathGenerator>,
+ parquet_reader: Arc<ParquetReader>,
config: SchedulerConfig,
) -> Self {
let (task_tx, task_rx) =
mpsc::channel(config.max_pending_compaction_tasks);
@@ -59,14 +64,18 @@ impl Scheduler {
let rt = runtime.clone();
let store = store.clone();
let manifest = manifest.clone();
+ let write_props = config.write_props.clone();
runtime.spawn(async move {
Self::recv_task_loop(
rt,
task_rx,
store,
+ schema,
manifest,
sst_path_gen,
+ parquet_reader,
config.memory_limit,
+ write_props,
)
.await;
})
@@ -99,15 +108,24 @@ impl Scheduler {
rt: RuntimeRef,
mut task_rx: Receiver<Task>,
store: ObjectStoreRef,
+ schema: StorageSchema,
manifest: ManifestRef,
- _sst_path_gen: Arc<SstPathGenerator>,
+ sst_path_gen: Arc<SstPathGenerator>,
+ parquet_reader: Arc<ParquetReader>,
_mem_limit: u64,
+ write_props: WriterProperties,
) {
+ let runner = Runner::new(
+ store,
+ schema,
+ manifest,
+ sst_path_gen,
+ parquet_reader,
+ write_props,
+ );
while let Some(task) = task_rx.recv().await {
- let store = store.clone();
- let manifest = manifest.clone();
+ let runner = runner.clone();
rt.spawn(async move {
- let runner = Runner { store, manifest };
if let Err(e) = runner.do_compaction(task).await {
warn!("Do compaction failed, err:{e}");
}
@@ -121,8 +139,8 @@ impl Scheduler {
segment_duration: Duration,
config: SchedulerConfig,
) {
- let compactor = TimeWindowCompactionStrategy::new(segment_duration,
config);
let schedule_interval = config.schedule_interval;
+ let compactor = TimeWindowCompactionStrategy::new(segment_duration,
config);
// TODO: obtain expire time
let expire_time = None;
loop {
@@ -138,12 +156,13 @@ impl Scheduler {
}
}
-#[derive(Clone, Copy)]
+#[derive(Clone)]
pub struct SchedulerConfig {
pub schedule_interval: Duration,
pub memory_limit: u64,
pub max_pending_compaction_tasks: usize,
pub compaction_files_limit: usize,
+ pub write_props: WriterProperties,
}
impl Default for SchedulerConfig {
@@ -153,19 +172,7 @@ impl Default for SchedulerConfig {
memory_limit: bytesize::gb(2_u64),
max_pending_compaction_tasks: 10,
compaction_files_limit: 10,
+ write_props: WriterProperties::default(),
}
}
}
-
-pub struct Runner {
- store: ObjectStoreRef,
- manifest: ManifestRef,
-}
-
-impl Runner {
- // TODO: Merge input sst files into one new sst file
- // and delete the expired sst files
- async fn do_compaction(&self, _task: Task) -> Result<()> {
- todo!()
- }
-}
diff --git a/src/metric_engine/src/manifest.rs
b/src/metric_engine/src/manifest.rs
index 0cb7f51f..1a50b90e 100644
--- a/src/metric_engine/src/manifest.rs
+++ b/src/metric_engine/src/manifest.rs
@@ -41,6 +41,7 @@ use tokio::{
},
};
use tracing::error;
+use uuid::Uuid;
use crate::{
ensure,
@@ -52,11 +53,13 @@ use crate::{
pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
pub const DELTA_PREFIX: &str = "delta";
+pub const TOMBSTONE_PREFIX: &str = "tombstone";
pub type ManifestRef = Arc<Manifest>;
pub struct Manifest {
delta_dir: Path,
+ tombstone_dir: Path,
store: ObjectStoreRef,
merger: Arc<ManifestMerger>,
@@ -126,6 +129,7 @@ impl Manifest {
) -> Result<Self> {
let snapshot_path =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
+ let tombstone_dir =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{TOMBSTONE_PREFIX}"));
let merger = ManifestMerger::try_new(
snapshot_path.clone(),
@@ -150,6 +154,7 @@ impl Manifest {
Ok(Self {
delta_dir,
+ tombstone_dir,
store,
merger,
payload: RwLock::new(payload),
@@ -187,6 +192,15 @@ impl Manifest {
Ok(())
}
+ // TODO: recovery tombstone files when startup
+ pub async fn add_tombstone_files<I>(&self, files: I) -> Result<()>
+ where
+ I: IntoIterator<Item = SstFile>,
+ {
+ let path = Path::from(format!("{}/{}", self.tombstone_dir,
Uuid::new_v4()));
+ todo!()
+ }
+
// TODO: avoid clone
pub async fn all_ssts(&self) -> Vec<SstFile> {
let payload = self.payload.read().await;
diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs
index 7769ff21..1a5abc48 100644
--- a/src/metric_engine/src/read.rs
+++ b/src/metric_engine/src/read.rs
@@ -28,14 +28,25 @@ use arrow::{
};
use arrow_schema::SchemaRef;
use datafusion::{
- common::internal_err,
- datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
+ common::{internal_err, DFSchema},
+ datasource::{
+ listing::PartitionedFile,
+ physical_plan::{FileMeta, FileScanConfig, ParquetExec,
ParquetFileReaderFactory},
+ },
error::{DataFusionError, Result as DfResult},
- execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
+ execution::{
+ context::ExecutionProps, object_store::ObjectStoreUrl,
RecordBatchStream,
+ SendableRecordBatchStream, TaskContext,
+ },
+ logical_expr::utils::conjunction,
parquet::arrow::async_reader::AsyncFileReader,
+ physical_expr::{create_physical_expr, LexOrdering},
physical_plan::{
- metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution,
ExecutionPlan, PlanProperties,
+ metrics::ExecutionPlanMetricsSet,
sorts::sort_preserving_merge::SortPreservingMergeExec,
+ DisplayAs, Distribution, ExecutionPlan, PlanProperties,
},
+ physical_planner::create_physical_sort_exprs,
+ prelude::{ident, Expr},
};
use futures::{Stream, StreamExt};
use itertools::Itertools;
@@ -44,8 +55,9 @@ use tracing::debug;
use crate::{
compare_primitive_columns,
- operator::MergeOperator,
- types::{ObjectStoreRef, SEQ_COLUMN_NAME},
+ operator::{BytesMergeOperator, LastValueOperator, MergeOperator,
MergeOperatorRef},
+ sst::{SstFile, SstPathGenerator},
+ types::{ObjectStoreRef, StorageSchema, UpdateMode, SEQ_COLUMN_NAME},
Result,
};
@@ -183,7 +195,7 @@ struct MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
- value_operator: Arc<dyn MergeOperator>,
+ value_operator: MergeOperatorRef,
pending_batch: Option<RecordBatch>,
arrow_schema: SchemaRef,
@@ -194,7 +206,7 @@ impl MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
- value_operator: Arc<dyn MergeOperator>,
+ value_operator: MergeOperatorRef,
) -> Self {
let fields = stream
.schema()
@@ -356,14 +368,111 @@ impl RecordBatchStream for MergeStream {
}
}
+pub struct ParquetReader {
+ store: ObjectStoreRef,
+ schema: StorageSchema,
+ sst_path_gen: Arc<SstPathGenerator>,
+}
+
+impl ParquetReader {
+ pub fn new(
+ store: ObjectStoreRef,
+ schema: StorageSchema,
+ sst_path_gen: Arc<SstPathGenerator>,
+ ) -> Self {
+ Self {
+ store,
+ schema,
+ sst_path_gen,
+ }
+ }
+
+ fn build_sort_exprs(&self, df_schema: &DFSchema, sort_seq: bool) ->
Result<LexOrdering> {
+ let mut sort_exprs = (0..self.schema.num_primary_keys)
+ .map(|i| {
+ ident(self.schema.arrow_schema.field(i).name())
+ .sort(true /* asc */, true /* nulls_first */)
+ })
+ .collect::<Vec<_>>();
+ if sort_seq {
+ sort_exprs.push(ident(SEQ_COLUMN_NAME).sort(true, true));
+ }
+ let sort_exprs =
+ create_physical_sort_exprs(&sort_exprs, df_schema,
&ExecutionProps::default())
+ .context("create physical sort exprs")?;
+
+ Ok(sort_exprs)
+ }
+
+ pub fn build_df_plan(
+ &self,
+ ssts: Vec<SstFile>,
+ projections: Option<Vec<usize>>,
+ predicates: Vec<Expr>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // we won't use url for selecting object_store.
+ let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
+ let df_schema =
+
DFSchema::try_from(self.schema.arrow_schema.clone()).context("build DFSchema")?;
+ let sort_exprs = self.build_sort_exprs(&df_schema, true /* sort_seq
*/)?;
+
+ let file_groups = ssts
+ .into_iter()
+ .map(|f| {
+ vec![PartitionedFile::new(
+ self.sst_path_gen.generate(f.id()),
+ f.meta().size as u64,
+ )]
+ })
+ .collect::<Vec<_>>();
+ let scan_config = FileScanConfig::new(dummy_url,
self.schema.arrow_schema.clone())
+ .with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
+ .with_file_groups(file_groups)
+ .with_projection(projections);
+
+ let mut builder =
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
+ Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
+ );
+ if let Some(expr) = conjunction(predicates) {
+ let filters = create_physical_expr(&expr, &df_schema,
&ExecutionProps::new())
+ .context("create physical expr")?;
+ builder = builder.with_predicate(filters);
+ }
+
+ // TODO: fetch using multiple threads since read from parquet will
incur CPU
+ // when convert between arrow and parquet.
+ let parquet_exec = builder.build();
+ let sort_exec = SortPreservingMergeExec::new(sort_exprs,
Arc::new(parquet_exec))
+ // TODO: make fetch size configurable.
+ .with_fetch(Some(1024))
+ .with_round_robin_repartition(true);
+
+ let merge_exec = MergeExec::new(
+ Arc::new(sort_exec),
+ self.schema.num_primary_keys,
+ self.schema.seq_idx,
+ match self.schema.update_mode {
+ UpdateMode::Overwrite => Arc::new(LastValueOperator),
+ UpdateMode::Append => {
+
Arc::new(BytesMergeOperator::new(self.schema.value_idxes.clone()))
+ }
+ },
+ );
+ Ok(Arc::new(merge_exec))
+ }
+}
+
#[cfg(test)]
mod tests {
+ use object_store::local::LocalFileSystem;
use test_log::test;
use super::*;
use crate::{
+ arrow_schema,
operator::{BytesMergeOperator, LastValueOperator, MergeOperatorRef},
record_batch,
+ sst::FileMeta,
test_util::{check_stream, make_sendable_record_batches},
};
@@ -379,7 +488,7 @@ mod tests {
record_batch!(("pk1", UInt8, vec![14]), ("value", Binary,
vec![b"9"])).unwrap(),
];
- test_merge_stream_for_append_mode(Arc::new(LastValueOperator),
expected).await;
+ test_merge_stream_inner(Arc::new(LastValueOperator), expected).await;
let expected = [
record_batch!(
@@ -391,11 +500,10 @@ mod tests {
record_batch!(("pk1", UInt8, vec![14]), ("value", Binary,
vec![b"9"])).unwrap(),
];
-
test_merge_stream_for_append_mode(Arc::new(BytesMergeOperator::new(vec![1])),
expected)
- .await;
+ test_merge_stream_inner(Arc::new(BytesMergeOperator::new(vec![1])),
expected).await;
}
- async fn test_merge_stream_for_append_mode<I>(merge_op: MergeOperatorRef,
expected: I)
+ async fn test_merge_stream_inner<I>(merge_op: MergeOperatorRef, expected:
I)
where
I: IntoIterator<Item = RecordBatch>,
{
@@ -423,4 +531,50 @@ mod tests {
let stream = MergeStream::new(stream, 1, 2, merge_op);
check_stream(Box::pin(stream), expected).await;
}
+
+ #[tokio::test]
+ async fn test_build_scan_plan() {
+ let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8),
(SEQ_COLUMN_NAME, UInt64));
+ let store = Arc::new(LocalFileSystem::new());
+ let reader = ParquetReader::new(
+ store,
+ StorageSchema {
+ arrow_schema: schema.clone(),
+ num_primary_keys: 1,
+ seq_idx: 2,
+ value_idxes: vec![1],
+ update_mode: UpdateMode::Overwrite,
+ },
+ Arc::new(SstPathGenerator::new("mock".to_string())),
+ );
+ let plan = reader
+ .build_df_plan(
+ (100..103)
+ .map(|id| {
+ SstFile::new(
+ id,
+ FileMeta {
+ max_sequence: id,
+ num_rows: 1,
+ size: 1,
+ time_range: (1..10).into(),
+ },
+ )
+ })
+ .collect(),
+ None,
+ vec![],
+ )
+ .unwrap();
+ let display_plan =
+
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
+ .indent(true);
+ assert_eq!(
+ r#"MergeExec: [primary_keys: 1, seq_idx: 2]
+ SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024
+ ParquetExec: file_groups={3 groups: [[mock/data/100.sst],
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__],
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC],
[pk1@0 ASC, __seq__@2 ASC]]
+"#,
+ format!("{display_plan}")
+ );
+ }
}
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index 0aa25361..37cc904e 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -25,20 +25,14 @@ use arrow::{
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use datafusion::{
+ self,
common::DFSchema,
- datasource::{
- listing::PartitionedFile,
- physical_plan::{FileScanConfig, ParquetExec},
- },
- execution::{context::ExecutionProps, object_store::ObjectStoreUrl,
SendableRecordBatchStream},
- logical_expr::{utils::conjunction, Expr},
- physical_expr::{create_physical_expr, LexOrdering},
+ execution::{context::ExecutionProps, SendableRecordBatchStream},
+ logical_expr::Expr,
+ physical_expr::LexOrdering,
physical_plan::{
- execute_stream,
- memory::MemoryExec,
- sorts::{sort::SortExec,
sort_preserving_merge::SortPreservingMergeExec},
- union::UnionExec,
- EmptyRecordBatchStream, ExecutionPlan,
+ execute_stream, memory::MemoryExec, sorts::sort::SortExec,
union::UnionExec,
+ EmptyRecordBatchStream,
},
physical_planner::create_physical_sort_exprs,
prelude::{ident, SessionContext},
@@ -58,11 +52,10 @@ use crate::{
compaction::{CompactionScheduler, SchedulerConfig},
ensure,
manifest::{Manifest, ManifestRef},
- operator::{BytesMergeOperator, LastValueOperator},
- read::{DefaultParquetFileReaderFactory, MergeExec},
- sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
+ read::ParquetReader,
+ sst::{allocate_id, FileMeta, SstPathGenerator},
types::{
- ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, UpdateMode,
WriteOptions,
+ ObjectStoreRef, RuntimeOptions, StorageOptions, StorageSchema,
TimeRange, WriteOptions,
WriteResult, SEQ_COLUMN_NAME,
},
Result,
@@ -136,15 +129,10 @@ pub struct CloudObjectStorage {
segment_duration: Duration,
path: String,
store: ObjectStoreRef,
- arrow_schema: SchemaRef,
- num_primary_keys: usize,
- seq_idx: usize,
- value_idxes: Vec<usize>,
- update_mode: UpdateMode,
+ schema: StorageSchema,
manifest: ManifestRef,
runtimes: StorageRuntimes,
-
- df_schema: DFSchema,
+ parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
sst_path_gen: Arc<SstPathGenerator>,
compact_scheduler: CompactionScheduler,
@@ -170,9 +158,6 @@ impl CloudObjectStorage {
num_primary_keys: usize,
storage_opts: StorageOptions,
) -> Result<Self> {
- let value_idxes =
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
- ensure!(!value_idxes.is_empty(), "no value column found");
-
let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
let manifest = Arc::new(
Manifest::try_new(
@@ -183,42 +168,59 @@ impl CloudObjectStorage {
)
.await?,
);
-
- let mut new_fields = arrow_schema.fields.clone().to_vec();
- new_fields.push(Arc::new(Field::new(
- SEQ_COLUMN_NAME,
- DataType::UInt64,
- true,
- )));
- let seq_idx = new_fields.len() - 1;
- let arrow_schema = Arc::new(Schema::new_with_metadata(
- new_fields,
- arrow_schema.metadata.clone(),
- ));
- let df_schema =
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
+ let schema = {
+ let value_idxes =
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
+ ensure!(!value_idxes.is_empty(), "no value column found");
+
+ let mut new_fields = arrow_schema.fields.clone().to_vec();
+ new_fields.push(Arc::new(Field::new(
+ SEQ_COLUMN_NAME,
+ DataType::UInt64,
+ true,
+ )));
+ let seq_idx = new_fields.len() - 1;
+ let arrow_schema = Arc::new(Schema::new_with_metadata(
+ new_fields,
+ arrow_schema.metadata.clone(),
+ ));
+ let update_mode = storage_opts.update_mode;
+ StorageSchema {
+ arrow_schema,
+ num_primary_keys,
+ seq_idx,
+ value_idxes,
+ update_mode,
+ }
+ };
let write_props = Self::build_write_props(storage_opts.write_opts,
num_primary_keys);
let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone()));
+ let parquet_reader = Arc::new(ParquetReader::new(
+ store.clone(),
+ schema.clone(),
+ sst_path_gen.clone(),
+ ));
+
let compact_scheduler = CompactionScheduler::new(
runtimes.sst_compact_runtime.clone(),
manifest.clone(),
store.clone(),
+ schema.clone(),
segment_duration,
sst_path_gen.clone(),
- SchedulerConfig::default(),
+ parquet_reader.clone(),
+ SchedulerConfig {
+ write_props: write_props.clone(),
+ ..Default::default()
+ },
);
- let update_mode = storage_opts.update_mode;
Ok(Self {
path,
- num_primary_keys,
- seq_idx,
- value_idxes,
- update_mode,
+ schema,
segment_duration,
store,
- arrow_schema,
manifest,
+ parquet_reader,
runtimes,
- df_schema,
write_props,
sst_path_gen,
compact_scheduler,
@@ -246,7 +248,7 @@ impl CloudObjectStorage {
// Since file_id in increasing order, we can use it as
sequence.
let seq_column = Arc::new(UInt64Array::from(vec![file_id;
batch.num_rows()]));
new_cols.push(seq_column);
- RecordBatch::try_new(self.arrow_schema.clone(), new_cols)
+ RecordBatch::try_new(self.schema.arrow_schema.clone(),
new_cols)
.context("construct record batch with seq column")?
};
writer
@@ -268,8 +270,8 @@ impl CloudObjectStorage {
})
}
- fn build_sort_exprs(&self, sort_seq: bool) -> Result<LexOrdering> {
- let mut sort_exprs = (0..self.num_primary_keys)
+ fn build_sort_exprs(&self, df_schema: &DFSchema, sort_seq: bool) ->
Result<LexOrdering> {
+ let mut sort_exprs = (0..self.schema.num_primary_keys)
.map(|i| {
ident(self.schema().field(i).name())
.sort(true /* asc */, true /* nulls_first */)
@@ -279,7 +281,7 @@ impl CloudObjectStorage {
sort_exprs.push(ident(SEQ_COLUMN_NAME).sort(true, true));
}
let sort_exprs =
- create_physical_sort_exprs(&sort_exprs, &self.df_schema,
&ExecutionProps::default())
+ create_physical_sort_exprs(&sort_exprs, df_schema,
&ExecutionProps::default())
.context("create physical sort exprs")?;
Ok(sort_exprs)
@@ -288,7 +290,8 @@ impl CloudObjectStorage {
async fn sort_batch(&self, batch: RecordBatch) ->
Result<SendableRecordBatchStream> {
let ctx = SessionContext::default();
let schema = batch.schema();
- let sort_exprs = self.build_sort_exprs(false /* sort_seq */)?;
+ let df_schema =
DFSchema::try_from(self.schema().clone()).context("build DFSchema")?;
+ let sort_exprs = self.build_sort_exprs(&df_schema, false /* sort_seq
*/)?;
let batch_plan =
MemoryExec::try_new(&[vec![batch]], schema, None).context("build
batch plan")?;
let physical_plan = Arc::new(SortExec::new(sort_exprs,
Arc::new(batch_plan)));
@@ -339,65 +342,12 @@ impl CloudObjectStorage {
builder.build()
}
-
- fn build_scan_plan(
- &self,
- ssts: Vec<SstFile>,
- projections: Option<Vec<usize>>,
- predicates: Vec<Expr>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- // we won't use url for selecting object_store.
- let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
- let sort_exprs = self.build_sort_exprs(true /* sort_seq */)?;
-
- let file_groups = ssts
- .into_iter()
- .map(|f| {
- vec![PartitionedFile::new(
- self.sst_path_gen.generate(f.id()),
- f.meta().size as u64,
- )]
- })
- .collect::<Vec<_>>();
- let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
- .with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
- .with_file_groups(file_groups)
- .with_projection(projections);
-
- let mut builder =
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
- Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
- );
- if let Some(expr) = conjunction(predicates) {
- let filters = create_physical_expr(&expr, &self.df_schema,
&ExecutionProps::new())
- .context("create physical expr")?;
- builder = builder.with_predicate(filters);
- }
-
- // TODO: fetch using multiple threads since read from parquet will
incur CPU
- // when convert between arrow and parquet.
- let parquet_exec = builder.build();
- let sort_exec = SortPreservingMergeExec::new(sort_exprs,
Arc::new(parquet_exec))
- // TODO: make fetch size configurable.
- .with_fetch(Some(1024))
- .with_round_robin_repartition(true);
-
- let merge_exec = MergeExec::new(
- Arc::new(sort_exec),
- self.num_primary_keys,
- self.seq_idx,
- match self.update_mode {
- UpdateMode::Overwrite => Arc::new(LastValueOperator),
- UpdateMode::Append =>
Arc::new(BytesMergeOperator::new(self.value_idxes.clone())),
- },
- );
- Ok(Arc::new(merge_exec))
- }
}
#[async_trait]
impl TimeMergeStorage for CloudObjectStorage {
fn schema(&self) -> &SchemaRef {
- &self.arrow_schema
+ &self.schema.arrow_schema
}
async fn write(&self, req: WriteRequest) -> Result<()> {
@@ -432,7 +382,7 @@ impl TimeMergeStorage for CloudObjectStorage {
let total_ssts = self.manifest.find_ssts(&req.range).await;
if total_ssts.is_empty() {
return Ok(Box::pin(EmptyRecordBatchStream::new(
- self.arrow_schema.clone(),
+ self.schema.arrow_schema.clone(),
)));
}
@@ -442,8 +392,11 @@ impl TimeMergeStorage for CloudObjectStorage {
let mut plan_for_all_segments = Vec::new();
for (_, ssts) in ssts_by_segment.sorted_by(|a, b| a.0.cmp(&b.0)) {
- let plan =
- self.build_scan_plan(ssts, req.projections.clone(),
req.predicate.clone())?;
+ let plan = self.parquet_reader.build_df_plan(
+ ssts,
+ req.projections.clone(),
+ req.predicate.clone(),
+ )?;
plan_for_all_segments.push(plan);
}
@@ -473,51 +426,6 @@ mod tests {
use super::*;
use crate::{arrow_schema, record_batch, test_util::check_stream,
types::Timestamp};
- #[tokio::test]
- async fn test_build_scan_plan() {
- let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8));
- let store = Arc::new(LocalFileSystem::new());
- let storage = CloudObjectStorage::try_new(
- "mock".to_string(),
- Duration::from_hours(2),
- store,
- schema.clone(),
- 1, // num_primary_keys
- StorageOptions::default(),
- )
- .await
- .unwrap();
- let plan = storage
- .build_scan_plan(
- (100..103)
- .map(|id| {
- SstFile::new(
- id,
- FileMeta {
- max_sequence: id,
- num_rows: 1,
- size: 1,
- time_range: (1..10).into(),
- },
- )
- })
- .collect(),
- None,
- vec![],
- )
- .unwrap();
- let display_plan =
-
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
- .indent(true);
- assert_eq!(
- r#"MergeExec: [primary_keys: 1, seq_idx: 2]
- SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024
- ParquetExec: file_groups={3 groups: [[mock/data/100.sst],
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__],
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC],
[pk1@0 ASC, __seq__@2 ASC]]
-"#,
- format!("{display_plan}")
- );
- }
-
#[test(tokio::test)]
async fn test_storage_write_and_scan() {
let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value",
Int64));
diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs
index c4972fe3..03d75ed4 100644
--- a/src/metric_engine/src/types.rs
+++ b/src/metric_engine/src/types.rs
@@ -22,6 +22,7 @@ use std::{
time::Duration,
};
+use arrow_schema::SchemaRef;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use tokio::runtime::Runtime;
@@ -111,6 +112,11 @@ impl TimeRange {
pub fn overlaps(&self, other: &TimeRange) -> bool {
self.0.start < other.0.end && other.0.start < self.0.end
}
+
+ pub fn merge(&mut self, other: &TimeRange) {
+ self.0.start = self.0.start.min(other.0.start);
+ self.0.end = self.0.end.max(other.0.end);
+ }
}
pub type ObjectStoreRef = Arc<dyn ObjectStore>;
@@ -194,7 +200,7 @@ impl Default for ManifestMergeOptions {
}
}
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum UpdateMode {
#[default]
Overwrite,
@@ -209,6 +215,15 @@ pub struct StorageOptions {
pub update_mode: UpdateMode,
}
+#[derive(Debug, Clone)]
+pub struct StorageSchema {
+ pub arrow_schema: SchemaRef,
+ pub num_primary_keys: usize,
+ pub seq_idx: usize,
+ pub value_idxes: Vec<usize>,
+ pub update_mode: UpdateMode,
+}
+
#[cfg(test)]
mod tests {
use super::*;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]