This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new e65c5044 feat: init scan implementation for time merge storage (#1586)
e65c5044 is described below
commit e65c50449e26edb0aa8c5fa695b7ce9eea1c3208
Author: Jiacai Liu <[email protected]>
AuthorDate: Tue Nov 5 16:44:11 2024 +0800
feat: init scan implementation for time merge storage (#1586)
## Rationale
## Detailed Changes
## Test Plan
CI
---
horaedb/metric_engine/src/lib.rs | 1 +
horaedb/metric_engine/src/manifest.rs | 40 ++++++++-----
horaedb/metric_engine/src/read.rs | 55 ++++++++++++++++++
horaedb/metric_engine/src/sst.rs | 30 ++++++++--
horaedb/metric_engine/src/storage.rs | 106 +++++++++++++++++++++++++---------
horaedb/metric_engine/src/types.rs | 84 +++++++++++++++++++++++----
horaedb/pb_types/protos/sst.proto | 3 +-
7 files changed, 260 insertions(+), 59 deletions(-)
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index 8a05223c..be7ef2b4 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -19,6 +19,7 @@
pub mod error;
mod manifest;
+mod read;
mod sst;
pub mod storage;
pub mod types;
diff --git a/horaedb/metric_engine/src/manifest.rs
b/horaedb/metric_engine/src/manifest.rs
index aceac3da..0d865a04 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -23,7 +23,7 @@ use tokio::sync::RwLock;
use crate::{
sst::{FileId, FileMeta, SstFile},
- types::ObjectStoreRef,
+ types::{ObjectStoreRef, TimeRange},
AnyhowError, Error, Result,
};
@@ -56,6 +56,18 @@ impl TryFrom<pb_types::Manifest> for Payload {
}
}
+impl From<Payload> for pb_types::Manifest {
+ fn from(value: Payload) -> Self {
+ pb_types::Manifest {
+ files: value
+ .files
+ .into_iter()
+ .map(pb_types::SstFile::from)
+ .collect(),
+ }
+ }
+}
+
impl Manifest {
pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
@@ -97,20 +109,7 @@ impl Manifest {
let new_sst = SstFile { id, meta };
tmp_ssts.push(new_sst.clone());
let pb_manifest = pb_types::Manifest {
- files: tmp_ssts
- .into_iter()
- .map(|f| pb_types::SstFile {
- id: f.id,
- meta: Some(pb_types::SstMeta {
- max_sequence: f.meta.max_sequence,
- num_rows: f.meta.num_rows,
- time_range: Some(pb_types::TimeRange {
- start: f.meta.time_range.start,
- end: f.meta.time_range.end,
- }),
- }),
- })
- .collect::<Vec<_>>(),
+ files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
};
let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
@@ -130,4 +129,15 @@ impl Manifest {
Ok(())
}
+
+ pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
+ let payload = self.payload.read().await;
+
+ payload
+ .files
+ .iter()
+ .filter(move |f| f.meta.time_range.overlaps(time_range))
+ .cloned()
+ .collect()
+ }
}
diff --git a/horaedb/metric_engine/src/read.rs
b/horaedb/metric_engine/src/read.rs
new file mode 100644
index 00000000..88522fe9
--- /dev/null
+++ b/horaedb/metric_engine/src/read.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::{
+ datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
+ error::Result as DfResult,
+ parquet::arrow::async_reader::AsyncFileReader,
+ physical_plan::metrics::ExecutionPlanMetricsSet,
+};
+use parquet::arrow::async_reader::ParquetObjectReader;
+
+use crate::types::ObjectStoreRef;
+
+#[derive(Debug, Clone)]
+pub struct DefaultParquetFileReaderFactory {
+ object_store: ObjectStoreRef,
+}
+
+/// Returns a AsyncFileReader factory
+impl DefaultParquetFileReaderFactory {
+ pub fn new(object_store: ObjectStoreRef) -> Self {
+ Self { object_store }
+ }
+}
+
+impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
+ fn create_reader(
+ &self,
+ _partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ _metrics: &ExecutionPlanMetricsSet,
+ ) -> DfResult<Box<dyn AsyncFileReader + Send>> {
+ let object_store = self.object_store.clone();
+ let mut reader = ParquetObjectReader::new(object_store,
file_meta.object_meta);
+ if let Some(size) = metadata_size_hint {
+ reader = reader.with_footer_size_hint(size);
+ }
+ Ok(Box::new(reader))
+ }
+}
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs
index 5eb96867..703b4bc4 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/metric_engine/src/sst.rs
@@ -49,10 +49,20 @@ impl TryFrom<pb_types::SstFile> for SstFile {
}
}
+impl From<SstFile> for pb_types::SstFile {
+ fn from(value: SstFile) -> Self {
+ pb_types::SstFile {
+ id: value.id,
+ meta: Some(value.meta.into()),
+ }
+ }
+}
+
#[derive(Clone, Debug)]
pub struct FileMeta {
pub max_sequence: u64,
pub num_rows: u32,
+ pub size: u32,
pub time_range: TimeRange,
}
@@ -66,14 +76,26 @@ impl TryFrom<pb_types::SstMeta> for FileMeta {
Ok(Self {
max_sequence: value.max_sequence,
num_rows: value.num_rows,
- time_range: TimeRange {
- start: time_range.start,
- end: time_range.end,
- },
+ size: value.size,
+ time_range: TimeRange::new(time_range.start.into(),
time_range.end.into()),
})
}
}
+impl From<FileMeta> for pb_types::SstMeta {
+ fn from(value: FileMeta) -> Self {
+ pb_types::SstMeta {
+ max_sequence: value.max_sequence,
+ num_rows: value.num_rows,
+ size: value.size,
+ time_range: Some(pb_types::TimeRange {
+ start: *value.time_range.start,
+ end: *value.time_range.end,
+ }),
+ }
+ }
+}
+
// Used for sst file id allocation.
// This number mustn't go backwards on restarts, otherwise file id
// collisions are possible. So don't change time on the server
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index 3a8d42f0..b64e7a8d 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -25,10 +25,13 @@ use arrow::{
use async_trait::async_trait;
use datafusion::{
common::DFSchema,
- execution::{
- context::ExecutionProps, SendableRecordBatchStream as
DFSendableRecordBatchStream,
+ datasource::{
+ listing::PartitionedFile,
+ physical_plan::{FileScanConfig, ParquetExec},
},
- logical_expr::Expr,
+ execution::{context::ExecutionProps, object_store::ObjectStoreUrl,
SendableRecordBatchStream},
+ logical_expr::{utils::conjunction, Expr},
+ physical_expr::{create_physical_expr, LexOrdering},
physical_plan::{execute_stream, memory::MemoryExec, sorts::sort::SortExec},
physical_planner::create_physical_sort_exprs,
prelude::{ident, SessionContext},
@@ -43,8 +46,9 @@ use parquet::{
use crate::{
manifest::Manifest,
+ read::DefaultParquetFileReaderFactory,
sst::{allocate_id, FileId, FileMeta},
- types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange, Timestamp},
+ types::{ObjectStoreRef, TimeRange, Timestamp, WriteResult},
Result,
};
@@ -55,7 +59,7 @@ pub struct WriteRequest {
pub struct ScanRequest {
range: TimeRange,
- predicate: Expr,
+ predicate: Vec<Expr>,
/// `None` means all columns.
projections: Option<Vec<usize>>,
}
@@ -84,6 +88,8 @@ pub struct CloudObjectStorage {
num_primary_key: usize,
timestamp_index: usize,
manifest: Manifest,
+
+ df_schema: DFSchema,
}
/// It will organize the data in the following way:
@@ -107,6 +113,7 @@ impl CloudObjectStorage {
let manifest_prefix = crate::manifest::PREFIX_PATH;
let manifest =
Manifest::try_new(format!("{root_path}/{manifest_prefix}"),
store.clone()).await?;
+ let df_schema =
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
Ok(Self {
path: root_path,
num_primary_key,
@@ -114,6 +121,7 @@ impl CloudObjectStorage {
store,
arrow_schema,
manifest,
+ df_schema,
})
}
@@ -123,11 +131,11 @@ impl CloudObjectStorage {
format!("{root}/{prefix}/{id}")
}
- async fn write_batch(&self, req: WriteRequest) -> Result<FileId> {
+ async fn write_batch(&self, req: WriteRequest) -> Result<WriteResult> {
let file_id = allocate_id();
let file_path = self.build_file_path(file_id);
- let object_store_writer =
- ParquetObjectWriter::new(self.store.clone(),
Path::from(file_path));
+ let file_path = Path::from(file_path);
+ let object_store_writer = ParquetObjectWriter::new(self.store.clone(),
file_path.clone());
let mut writer =
AsyncArrowWriter::try_new(object_store_writer,
self.schema().clone(), req.props)
.context("create arrow writer")?;
@@ -139,27 +147,38 @@ impl CloudObjectStorage {
writer.write(&batch).await.context("write arrow batch")?;
}
writer.close().await.context("close arrow writer")?;
-
- Ok(file_id)
+ let object_meta = self
+ .store
+ .head(&file_path)
+ .await
+ .context("get object meta")?;
+
+ Ok(WriteResult {
+ id: file_id,
+ size: object_meta.size,
+ })
}
- async fn sort_batch(&self, batch: RecordBatch) ->
Result<DFSendableRecordBatchStream> {
- let ctx = SessionContext::default();
- let schema = batch.schema();
- let df_schema = DFSchema::try_from(schema.clone()).context("build
DFSchema")?;
-
+ fn build_sort_exprs(&self) -> Result<LexOrdering> {
let sort_exprs = (0..self.num_primary_key)
.collect::<Vec<_>>()
.iter()
- .map(|i| ident(schema.clone().field(*i).name()).sort(true, true))
+ .map(|i| ident(self.schema().field(*i).name()).sort(true, true))
.collect::<Vec<_>>();
- let physical_sort_exprs =
- create_physical_sort_exprs(&sort_exprs, &df_schema,
&ExecutionProps::default())
+ let sort_exprs =
+ create_physical_sort_exprs(&sort_exprs, &self.df_schema,
&ExecutionProps::default())
.context("create physical sort exprs")?;
+ Ok(sort_exprs)
+ }
+
+ async fn sort_batch(&self, batch: RecordBatch) ->
Result<SendableRecordBatchStream> {
+ let ctx = SessionContext::default();
+ let schema = batch.schema();
+ let sort_exprs = self.build_sort_exprs()?;
let batch_plan =
MemoryExec::try_new(&[vec![batch]], schema, None).context("build
batch plan")?;
- let physical_plan = Arc::new(SortExec::new(physical_sort_exprs,
Arc::new(batch_plan)));
+ let physical_plan = Arc::new(SortExec::new(sort_exprs,
Arc::new(batch_plan)));
let res =
execute_stream(physical_plan, ctx.task_ctx()).context("execute
sort physical plan")?;
@@ -187,17 +206,18 @@ impl TimeMergeStorage for CloudObjectStorage {
let mut start = Timestamp::MAX;
let mut end = Timestamp::MIN;
for v in time_column.values() {
- start = start.min(*v);
- end = end.max(*v);
+ start = start.min(Timestamp(*v));
+ end = end.max(Timestamp(*v));
}
- let time_range = TimeRange {
- start,
- end: end + 1,
- };
- let file_id = self.write_batch(req).await?;
+ let time_range = TimeRange::new(start, end + 1);
+ let WriteResult {
+ id: file_id,
+ size: file_size,
+ } = self.write_batch(req).await?;
let file_meta = FileMeta {
max_sequence: file_id, // Since file_id in increasing order, we
can use it as sequence.
num_rows: num_rows as u32,
+ size: file_size as u32,
time_range,
};
self.manifest.add_file(file_id, file_meta).await?;
@@ -206,7 +226,39 @@ impl TimeMergeStorage for CloudObjectStorage {
}
async fn scan(&self, req: ScanRequest) ->
Result<SendableRecordBatchStream> {
- todo!()
+ let ssts = self.manifest.find_ssts(&req.range).await;
+ // we won't use url for selecting object_store.
+ let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
+ // TODO: we could group ssts based on time range.
+ // TODO: fetch using multiple threads since read from parquet will
incur CPU
+ // when convert between arrow and parquet.
+ let file_groups = ssts
+ .iter()
+ .map(|f| PartitionedFile::new(self.build_file_path(f.id),
f.meta.size as u64))
+ .collect::<Vec<_>>();
+ let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
+ .with_file_group(file_groups)
+ .with_projection(req.projections);
+
+ let mut builder =
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
+ Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
+ );
+ if let Some(expr) = conjunction(req.predicate) {
+ let filters = create_physical_expr(&expr, &self.df_schema,
&ExecutionProps::new())
+ .context("create pyhsical expr")?;
+ builder = builder.with_predicate(filters);
+ }
+
+ let parquet_exec = builder.build();
+ let sort_exprs = self.build_sort_exprs()?;
+ let physical_plan = Arc::new(SortExec::new(sort_exprs,
Arc::new(parquet_exec)));
+
+ let ctx = SessionContext::default();
+ // TODO: dedup record batch based on primary keys and sequence number.
+ let res =
+ execute_stream(physical_plan, ctx.task_ctx()).context("execute
sort physical plan")?;
+
+ Ok(res)
}
async fn compact(&self, req: CompactRequest) -> Result<()> {
diff --git a/horaedb/metric_engine/src/types.rs
b/horaedb/metric_engine/src/types.rs
index 96a4b74a..e6b518a5 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -15,23 +15,83 @@
// specific language governing permissions and limitations
// under the License.
-use std::{ops::Range, pin::Pin, sync::Arc};
+use std::{
+ ops::{Add, Deref, Range},
+ sync::Arc,
+};
-use arrow::{array::RecordBatch, datatypes::Schema};
-use futures::Stream;
use object_store::ObjectStore;
-use crate::error::Result;
+use crate::sst::FileId;
-pub type Timestamp = i64;
-pub type TimeRange = Range<Timestamp>;
+#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Timestamp(pub i64);
-pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+impl Add for Timestamp {
+ type Output = Self;
+
+ fn add(self, rhs: Self) -> Self::Output {
+ Self(self.0 + rhs.0)
+ }
+}
+
+impl Add<i64> for Timestamp {
+ type Output = Self;
+
+ fn add(self, rhs: i64) -> Self::Output {
+ Self(self.0 + rhs)
+ }
+}
-/// Trait for types that stream [arrow::record_batch::RecordBatch]
-pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
- fn schema(&self) -> &Schema;
+impl From<i64> for Timestamp {
+ fn from(value: i64) -> Self {
+ Self(value)
+ }
}
-/// Trait for a [`Stream`] of [`RecordBatch`]es
-pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
+impl Deref for Timestamp {
+ type Target = i64;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl Timestamp {
+ pub const MAX: Timestamp = Timestamp(i64::MAX);
+ pub const MIN: Timestamp = Timestamp(i64::MIN);
+}
+
+#[derive(Clone, Debug)]
+pub struct TimeRange(Range<Timestamp>);
+
+impl From<Range<Timestamp>> for TimeRange {
+ fn from(value: Range<Timestamp>) -> Self {
+ Self(value)
+ }
+}
+
+impl Deref for TimeRange {
+ type Target = Range<Timestamp>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl TimeRange {
+ pub fn new(start: Timestamp, end: Timestamp) -> Self {
+ Self(start..end)
+ }
+
+ pub fn overlaps(&self, other: &TimeRange) -> bool {
+ self.0.start < other.0.end && other.0.start < self.0.end
+ }
+}
+
+pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+
+pub struct WriteResult {
+ pub id: FileId,
+ pub size: usize,
+}
diff --git a/horaedb/pb_types/protos/sst.proto
b/horaedb/pb_types/protos/sst.proto
index ce3db301..5312ffa7 100644
--- a/horaedb/pb_types/protos/sst.proto
+++ b/horaedb/pb_types/protos/sst.proto
@@ -32,7 +32,8 @@ message TimeRange {
message SstMeta {
uint64 max_sequence = 1;
uint32 num_rows = 2;
- TimeRange time_range = 3;
+ uint32 size = 3;
+ TimeRange time_range = 4;
}
message SstFile {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]