This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new e65c5044 feat: init scan implementation for time merge storage (#1586)
e65c5044 is described below

commit e65c50449e26edb0aa8c5fa695b7ce9eea1c3208
Author: Jiacai Liu <[email protected]>
AuthorDate: Tue Nov 5 16:44:11 2024 +0800

    feat: init scan implementation for time merge storage (#1586)
    
    ## Rationale
    
    
    ## Detailed Changes
    
    
    ## Test Plan
    CI
---
 horaedb/metric_engine/src/lib.rs      |   1 +
 horaedb/metric_engine/src/manifest.rs |  40 ++++++++-----
 horaedb/metric_engine/src/read.rs     |  55 ++++++++++++++++++
 horaedb/metric_engine/src/sst.rs      |  30 ++++++++--
 horaedb/metric_engine/src/storage.rs  | 106 +++++++++++++++++++++++++---------
 horaedb/metric_engine/src/types.rs    |  84 +++++++++++++++++++++++----
 horaedb/pb_types/protos/sst.proto     |   3 +-
 7 files changed, 260 insertions(+), 59 deletions(-)

diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index 8a05223c..be7ef2b4 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -19,6 +19,7 @@
 
 pub mod error;
 mod manifest;
+mod read;
 mod sst;
 pub mod storage;
 pub mod types;
diff --git a/horaedb/metric_engine/src/manifest.rs 
b/horaedb/metric_engine/src/manifest.rs
index aceac3da..0d865a04 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -23,7 +23,7 @@ use tokio::sync::RwLock;
 
 use crate::{
     sst::{FileId, FileMeta, SstFile},
-    types::ObjectStoreRef,
+    types::{ObjectStoreRef, TimeRange},
     AnyhowError, Error, Result,
 };
 
@@ -56,6 +56,18 @@ impl TryFrom<pb_types::Manifest> for Payload {
     }
 }
 
+impl From<Payload> for pb_types::Manifest {
+    fn from(value: Payload) -> Self {
+        pb_types::Manifest {
+            files: value
+                .files
+                .into_iter()
+                .map(pb_types::SstFile::from)
+                .collect(),
+        }
+    }
+}
+
 impl Manifest {
     pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
         let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
@@ -97,20 +109,7 @@ impl Manifest {
         let new_sst = SstFile { id, meta };
         tmp_ssts.push(new_sst.clone());
         let pb_manifest = pb_types::Manifest {
-            files: tmp_ssts
-                .into_iter()
-                .map(|f| pb_types::SstFile {
-                    id: f.id,
-                    meta: Some(pb_types::SstMeta {
-                        max_sequence: f.meta.max_sequence,
-                        num_rows: f.meta.num_rows,
-                        time_range: Some(pb_types::TimeRange {
-                            start: f.meta.time_range.start,
-                            end: f.meta.time_range.end,
-                        }),
-                    }),
-                })
-                .collect::<Vec<_>>(),
+            files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
         };
 
         let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
@@ -130,4 +129,15 @@ impl Manifest {
 
         Ok(())
     }
+
+    pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
+        let payload = self.payload.read().await;
+
+        payload
+            .files
+            .iter()
+            .filter(move |f| f.meta.time_range.overlaps(time_range))
+            .cloned()
+            .collect()
+    }
 }
diff --git a/horaedb/metric_engine/src/read.rs 
b/horaedb/metric_engine/src/read.rs
new file mode 100644
index 00000000..88522fe9
--- /dev/null
+++ b/horaedb/metric_engine/src/read.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::{
+    datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
+    error::Result as DfResult,
+    parquet::arrow::async_reader::AsyncFileReader,
+    physical_plan::metrics::ExecutionPlanMetricsSet,
+};
+use parquet::arrow::async_reader::ParquetObjectReader;
+
+use crate::types::ObjectStoreRef;
+
+#[derive(Debug, Clone)]
+pub struct DefaultParquetFileReaderFactory {
+    object_store: ObjectStoreRef,
+}
+
+/// Returns a AsyncFileReader factory
+impl DefaultParquetFileReaderFactory {
+    pub fn new(object_store: ObjectStoreRef) -> Self {
+        Self { object_store }
+    }
+}
+
+impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        _partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        _metrics: &ExecutionPlanMetricsSet,
+    ) -> DfResult<Box<dyn AsyncFileReader + Send>> {
+        let object_store = self.object_store.clone();
+        let mut reader = ParquetObjectReader::new(object_store, 
file_meta.object_meta);
+        if let Some(size) = metadata_size_hint {
+            reader = reader.with_footer_size_hint(size);
+        }
+        Ok(Box::new(reader))
+    }
+}
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs
index 5eb96867..703b4bc4 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/metric_engine/src/sst.rs
@@ -49,10 +49,20 @@ impl TryFrom<pb_types::SstFile> for SstFile {
     }
 }
 
+impl From<SstFile> for pb_types::SstFile {
+    fn from(value: SstFile) -> Self {
+        pb_types::SstFile {
+            id: value.id,
+            meta: Some(value.meta.into()),
+        }
+    }
+}
+
 #[derive(Clone, Debug)]
 pub struct FileMeta {
     pub max_sequence: u64,
     pub num_rows: u32,
+    pub size: u32,
     pub time_range: TimeRange,
 }
 
@@ -66,14 +76,26 @@ impl TryFrom<pb_types::SstMeta> for FileMeta {
         Ok(Self {
             max_sequence: value.max_sequence,
             num_rows: value.num_rows,
-            time_range: TimeRange {
-                start: time_range.start,
-                end: time_range.end,
-            },
+            size: value.size,
+            time_range: TimeRange::new(time_range.start.into(), 
time_range.end.into()),
         })
     }
 }
 
+impl From<FileMeta> for pb_types::SstMeta {
+    fn from(value: FileMeta) -> Self {
+        pb_types::SstMeta {
+            max_sequence: value.max_sequence,
+            num_rows: value.num_rows,
+            size: value.size,
+            time_range: Some(pb_types::TimeRange {
+                start: *value.time_range.start,
+                end: *value.time_range.end,
+            }),
+        }
+    }
+}
+
 // Used for sst file id allocation.
 // This number mustn't go backwards on restarts, otherwise file id
 // collisions are possible. So don't change time on the server
diff --git a/horaedb/metric_engine/src/storage.rs 
b/horaedb/metric_engine/src/storage.rs
index 3a8d42f0..b64e7a8d 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -25,10 +25,13 @@ use arrow::{
 use async_trait::async_trait;
 use datafusion::{
     common::DFSchema,
-    execution::{
-        context::ExecutionProps, SendableRecordBatchStream as 
DFSendableRecordBatchStream,
+    datasource::{
+        listing::PartitionedFile,
+        physical_plan::{FileScanConfig, ParquetExec},
     },
-    logical_expr::Expr,
+    execution::{context::ExecutionProps, object_store::ObjectStoreUrl, 
SendableRecordBatchStream},
+    logical_expr::{utils::conjunction, Expr},
+    physical_expr::{create_physical_expr, LexOrdering},
     physical_plan::{execute_stream, memory::MemoryExec, sorts::sort::SortExec},
     physical_planner::create_physical_sort_exprs,
     prelude::{ident, SessionContext},
@@ -43,8 +46,9 @@ use parquet::{
 
 use crate::{
     manifest::Manifest,
+    read::DefaultParquetFileReaderFactory,
     sst::{allocate_id, FileId, FileMeta},
-    types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange, Timestamp},
+    types::{ObjectStoreRef, TimeRange, Timestamp, WriteResult},
     Result,
 };
 
@@ -55,7 +59,7 @@ pub struct WriteRequest {
 
 pub struct ScanRequest {
     range: TimeRange,
-    predicate: Expr,
+    predicate: Vec<Expr>,
     /// `None` means all columns.
     projections: Option<Vec<usize>>,
 }
@@ -84,6 +88,8 @@ pub struct CloudObjectStorage {
     num_primary_key: usize,
     timestamp_index: usize,
     manifest: Manifest,
+
+    df_schema: DFSchema,
 }
 
 /// It will organize the data in the following way:
@@ -107,6 +113,7 @@ impl CloudObjectStorage {
         let manifest_prefix = crate::manifest::PREFIX_PATH;
         let manifest =
             Manifest::try_new(format!("{root_path}/{manifest_prefix}"), 
store.clone()).await?;
+        let df_schema = 
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
         Ok(Self {
             path: root_path,
             num_primary_key,
@@ -114,6 +121,7 @@ impl CloudObjectStorage {
             store,
             arrow_schema,
             manifest,
+            df_schema,
         })
     }
 
@@ -123,11 +131,11 @@ impl CloudObjectStorage {
         format!("{root}/{prefix}/{id}")
     }
 
-    async fn write_batch(&self, req: WriteRequest) -> Result<FileId> {
+    async fn write_batch(&self, req: WriteRequest) -> Result<WriteResult> {
         let file_id = allocate_id();
         let file_path = self.build_file_path(file_id);
-        let object_store_writer =
-            ParquetObjectWriter::new(self.store.clone(), 
Path::from(file_path));
+        let file_path = Path::from(file_path);
+        let object_store_writer = ParquetObjectWriter::new(self.store.clone(), 
file_path.clone());
         let mut writer =
             AsyncArrowWriter::try_new(object_store_writer, 
self.schema().clone(), req.props)
                 .context("create arrow writer")?;
@@ -139,27 +147,38 @@ impl CloudObjectStorage {
             writer.write(&batch).await.context("write arrow batch")?;
         }
         writer.close().await.context("close arrow writer")?;
-
-        Ok(file_id)
+        let object_meta = self
+            .store
+            .head(&file_path)
+            .await
+            .context("get object meta")?;
+
+        Ok(WriteResult {
+            id: file_id,
+            size: object_meta.size,
+        })
     }
 
-    async fn sort_batch(&self, batch: RecordBatch) -> 
Result<DFSendableRecordBatchStream> {
-        let ctx = SessionContext::default();
-        let schema = batch.schema();
-        let df_schema = DFSchema::try_from(schema.clone()).context("build 
DFSchema")?;
-
+    fn build_sort_exprs(&self) -> Result<LexOrdering> {
         let sort_exprs = (0..self.num_primary_key)
             .collect::<Vec<_>>()
             .iter()
-            .map(|i| ident(schema.clone().field(*i).name()).sort(true, true))
+            .map(|i| ident(self.schema().field(*i).name()).sort(true, true))
             .collect::<Vec<_>>();
-        let physical_sort_exprs =
-            create_physical_sort_exprs(&sort_exprs, &df_schema, 
&ExecutionProps::default())
+        let sort_exprs =
+            create_physical_sort_exprs(&sort_exprs, &self.df_schema, 
&ExecutionProps::default())
                 .context("create physical sort exprs")?;
 
+        Ok(sort_exprs)
+    }
+
+    async fn sort_batch(&self, batch: RecordBatch) -> 
Result<SendableRecordBatchStream> {
+        let ctx = SessionContext::default();
+        let schema = batch.schema();
+        let sort_exprs = self.build_sort_exprs()?;
         let batch_plan =
             MemoryExec::try_new(&[vec![batch]], schema, None).context("build 
batch plan")?;
-        let physical_plan = Arc::new(SortExec::new(physical_sort_exprs, 
Arc::new(batch_plan)));
+        let physical_plan = Arc::new(SortExec::new(sort_exprs, 
Arc::new(batch_plan)));
 
         let res =
             execute_stream(physical_plan, ctx.task_ctx()).context("execute 
sort physical plan")?;
@@ -187,17 +206,18 @@ impl TimeMergeStorage for CloudObjectStorage {
         let mut start = Timestamp::MAX;
         let mut end = Timestamp::MIN;
         for v in time_column.values() {
-            start = start.min(*v);
-            end = end.max(*v);
+            start = start.min(Timestamp(*v));
+            end = end.max(Timestamp(*v));
         }
-        let time_range = TimeRange {
-            start,
-            end: end + 1,
-        };
-        let file_id = self.write_batch(req).await?;
+        let time_range = TimeRange::new(start, end + 1);
+        let WriteResult {
+            id: file_id,
+            size: file_size,
+        } = self.write_batch(req).await?;
         let file_meta = FileMeta {
             max_sequence: file_id, // Since file_id in increasing order, we 
can use it as sequence.
             num_rows: num_rows as u32,
+            size: file_size as u32,
             time_range,
         };
         self.manifest.add_file(file_id, file_meta).await?;
@@ -206,7 +226,39 @@ impl TimeMergeStorage for CloudObjectStorage {
     }
 
     async fn scan(&self, req: ScanRequest) -> 
Result<SendableRecordBatchStream> {
-        todo!()
+        let ssts = self.manifest.find_ssts(&req.range).await;
+        // we won't use url for selecting object_store.
+        let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
+        // TODO: we could group ssts based on time range.
+        // TODO: fetch using multiple threads since read from parquet will 
incur CPU
+        // when convert between arrow and parquet.
+        let file_groups = ssts
+            .iter()
+            .map(|f| PartitionedFile::new(self.build_file_path(f.id), 
f.meta.size as u64))
+            .collect::<Vec<_>>();
+        let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
+            .with_file_group(file_groups)
+            .with_projection(req.projections);
+
+        let mut builder = 
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
+            Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
+        );
+        if let Some(expr) = conjunction(req.predicate) {
+            let filters = create_physical_expr(&expr, &self.df_schema, 
&ExecutionProps::new())
+                .context("create pyhsical expr")?;
+            builder = builder.with_predicate(filters);
+        }
+
+        let parquet_exec = builder.build();
+        let sort_exprs = self.build_sort_exprs()?;
+        let physical_plan = Arc::new(SortExec::new(sort_exprs, 
Arc::new(parquet_exec)));
+
+        let ctx = SessionContext::default();
+        // TODO: dedup record batch based on primary keys and sequence number.
+        let res =
+            execute_stream(physical_plan, ctx.task_ctx()).context("execute 
sort physical plan")?;
+
+        Ok(res)
     }
 
     async fn compact(&self, req: CompactRequest) -> Result<()> {
diff --git a/horaedb/metric_engine/src/types.rs 
b/horaedb/metric_engine/src/types.rs
index 96a4b74a..e6b518a5 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -15,23 +15,83 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{ops::Range, pin::Pin, sync::Arc};
+use std::{
+    ops::{Add, Deref, Range},
+    sync::Arc,
+};
 
-use arrow::{array::RecordBatch, datatypes::Schema};
-use futures::Stream;
 use object_store::ObjectStore;
 
-use crate::error::Result;
+use crate::sst::FileId;
 
-pub type Timestamp = i64;
-pub type TimeRange = Range<Timestamp>;
+#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Timestamp(pub i64);
 
-pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+impl Add for Timestamp {
+    type Output = Self;
+
+    fn add(self, rhs: Self) -> Self::Output {
+        Self(self.0 + rhs.0)
+    }
+}
+
+impl Add<i64> for Timestamp {
+    type Output = Self;
+
+    fn add(self, rhs: i64) -> Self::Output {
+        Self(self.0 + rhs)
+    }
+}
 
-/// Trait for types that stream [arrow::record_batch::RecordBatch]
-pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
-    fn schema(&self) -> &Schema;
+impl From<i64> for Timestamp {
+    fn from(value: i64) -> Self {
+        Self(value)
+    }
 }
 
-/// Trait for a [`Stream`] of [`RecordBatch`]es
-pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
+impl Deref for Timestamp {
+    type Target = i64;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl Timestamp {
+    pub const MAX: Timestamp = Timestamp(i64::MAX);
+    pub const MIN: Timestamp = Timestamp(i64::MIN);
+}
+
+#[derive(Clone, Debug)]
+pub struct TimeRange(Range<Timestamp>);
+
+impl From<Range<Timestamp>> for TimeRange {
+    fn from(value: Range<Timestamp>) -> Self {
+        Self(value)
+    }
+}
+
+impl Deref for TimeRange {
+    type Target = Range<Timestamp>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl TimeRange {
+    pub fn new(start: Timestamp, end: Timestamp) -> Self {
+        Self(start..end)
+    }
+
+    pub fn overlaps(&self, other: &TimeRange) -> bool {
+        self.0.start < other.0.end && other.0.start < self.0.end
+    }
+}
+
+pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+
+pub struct WriteResult {
+    pub id: FileId,
+    pub size: usize,
+}
diff --git a/horaedb/pb_types/protos/sst.proto 
b/horaedb/pb_types/protos/sst.proto
index ce3db301..5312ffa7 100644
--- a/horaedb/pb_types/protos/sst.proto
+++ b/horaedb/pb_types/protos/sst.proto
@@ -32,7 +32,8 @@ message TimeRange {
 message SstMeta {
   uint64 max_sequence = 1;
   uint32 num_rows = 2;
-  TimeRange time_range = 3;
+  uint32 size = 3;
+  TimeRange time_range = 4;
 }
 
 message SstFile {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to