This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 1331e0a3 feat: add compaction runner (#1609)
1331e0a3 is described below

commit 1331e0a3d114d770abec071af7229b5281da6abc
Author: Jiacai Liu <[email protected]>
AuthorDate: Tue Dec 17 11:06:16 2024 +0800

    feat: add compaction runner (#1609)
    
    ## Rationale
    Compaction runner is responsible for compact old sst & delete expired
    sst.
    
    ## Detailed Changes
    
    
    ## Test Plan
    CI
---
 Cargo.lock                                    |  14 ++
 Cargo.toml                                    |   1 +
 Makefile                                      |   2 +-
 src/metric_engine/Cargo.toml                  |   1 +
 src/metric_engine/src/compaction/mod.rs       |   1 +
 src/metric_engine/src/compaction/runner.rs    | 172 ++++++++++++++++++++
 src/metric_engine/src/compaction/scheduler.rs |  47 +++---
 src/metric_engine/src/manifest.rs             |  14 ++
 src/metric_engine/src/read.rs                 | 178 +++++++++++++++++++--
 src/metric_engine/src/storage.rs              | 216 ++++++++------------------
 src/metric_engine/src/types.rs                |  17 +-
 11 files changed, 475 insertions(+), 188 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index d484dea6..2c660fc3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1677,6 +1677,7 @@ dependencies = [
  "thiserror",
  "tokio",
  "tracing",
+ "uuid",
 ]
 
 [[package]]
@@ -2738,6 +2739,19 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
 dependencies = [
  "getrandom",
+ "rand",
+ "uuid-macro-internal",
+]
+
+[[package]]
+name = "uuid-macro-internal"
+version = "1.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6b91f57fe13a38d0ce9e28a03463d8d3c2468ed03d75375110ec71d93b449a08"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 4be3a049..4d121b50 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,6 +50,7 @@ tracing = "0.1"
 tracing-subscriber = "0.3"
 async-scoped = { version = "0.9.0", features = ["use-tokio"] }
 test-log = "0.2"
+uuid = { version = "1" }
 
 # This profile optimizes for good runtime performance.
 [profile.release]
diff --git a/Makefile b/Makefile
index 915073d5..b39aeb18 100644
--- a/Makefile
+++ b/Makefile
@@ -48,7 +48,7 @@ udeps:
 
 clippy:
        cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D 
warnings -D clippy::dbg-macro \
-       -A dead_code -A unused_variables -A clippy::unreachable # Remove these 
once we have a clean build
+       -A dead_code -A unused_variables -A clippy::unreachable -A 
clippy::too_many_arguments # Remove these once we have a clean build
 
 ensure-disk-quota:
        bash ./scripts/free-disk-space.sh
diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml
index e84c4087..ca8dc576 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/metric_engine/Cargo.toml
@@ -50,6 +50,7 @@ prost = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
 tracing = { workspace = true }
+uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
 
 [dev-dependencies]
 temp-dir = { workspace = true }
diff --git a/src/metric_engine/src/compaction/mod.rs 
b/src/metric_engine/src/compaction/mod.rs
index a2037db5..62beda0a 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/compaction/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 mod picker;
+mod runner;
 mod scheduler;
 
 pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
diff --git a/src/metric_engine/src/compaction/runner.rs 
b/src/metric_engine/src/compaction/runner.rs
new file mode 100644
index 00000000..ce6f170e
--- /dev/null
+++ b/src/metric_engine/src/compaction/runner.rs
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use anyhow::Context;
+use arrow::array::{RecordBatch, UInt64Array};
+use async_scoped::TokioScope;
+use datafusion::{execution::TaskContext, physical_plan::execute_stream};
+use futures::StreamExt;
+use object_store::path::Path;
+use parquet::{
+    arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
+    file::properties::WriterProperties,
+};
+use tracing::error;
+
+use crate::{
+    compaction::Task,
+    manifest::ManifestRef,
+    read::ParquetReader,
+    sst::{allocate_id, FileMeta, SstPathGenerator},
+    types::{ObjectStoreRef, StorageSchema},
+    Result,
+};
+
+#[derive(Clone)]
+pub struct Runner {
+    store: ObjectStoreRef,
+    schema: StorageSchema,
+    manifest: ManifestRef,
+    sst_path_gen: Arc<SstPathGenerator>,
+    parquet_reader: Arc<ParquetReader>,
+    write_props: WriterProperties,
+}
+
+impl Runner {
+    pub fn new(
+        store: ObjectStoreRef,
+        schema: StorageSchema,
+        manifest: ManifestRef,
+        sst_path_gen: Arc<SstPathGenerator>,
+        parquet_reader: Arc<ParquetReader>,
+        write_props: WriterProperties,
+    ) -> Self {
+        Self {
+            store,
+            schema,
+            manifest,
+            sst_path_gen,
+            parquet_reader,
+            write_props,
+        }
+    }
+
+    // TODO: Merge input sst files into one new sst file
+    // and delete the expired sst files
+    pub async fn do_compaction(&self, task: Task) -> Result<()> {
+        assert!(!task.inputs.is_empty());
+        for f in &task.inputs {
+            assert!(f.is_compaction());
+        }
+        for f in &task.expireds {
+            assert!(f.is_compaction());
+        }
+        let mut time_range = task.inputs[0].meta().time_range.clone();
+        for f in &task.inputs[1..] {
+            time_range.merge(&f.meta().time_range);
+        }
+        let plan = self
+            .parquet_reader
+            .build_df_plan(task.inputs.clone(), None, Vec::new())?;
+        let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
+            .context("execute datafusion plan")?;
+
+        let file_id = allocate_id();
+        let file_path = self.sst_path_gen.generate(file_id);
+        let file_path = Path::from(file_path);
+        let object_store_writer = ParquetObjectWriter::new(self.store.clone(), 
file_path.clone());
+        let mut writer = AsyncArrowWriter::try_new(
+            object_store_writer,
+            self.schema.arrow_schema.clone(),
+            Some(self.write_props.clone()),
+        )
+        .context("create arrow writer")?;
+        let mut num_rows = 0;
+        // TODO: support multi-part write
+        while let Some(batch) = stream.next().await {
+            let batch = batch.context("execute plan")?;
+            num_rows += batch.num_rows();
+            let batch_with_seq = {
+                let mut new_cols = batch.columns().to_vec();
+                // Since file_id in increasing order, we can use it as 
sequence.
+                let seq_column = Arc::new(UInt64Array::from(vec![file_id; 
batch.num_rows()]));
+                new_cols.push(seq_column);
+                RecordBatch::try_new(self.schema.arrow_schema.clone(), 
new_cols)
+                    .context("construct record batch with seq column")?
+            };
+
+            writer.write(&batch_with_seq).await.context("write batch")?;
+        }
+        writer.close().await.context("close writer")?;
+        let object_meta = self
+            .store
+            .head(&file_path)
+            .await
+            .context("get object meta")?;
+        let file_meta = FileMeta {
+            max_sequence: file_id,
+            num_rows: num_rows as u32,
+            size: object_meta.size as u32,
+            time_range: time_range.clone(),
+        };
+        // First add new sst to manifest, then delete expired/old sst
+        self.manifest.add_file(file_id, file_meta).await?;
+        self.manifest
+            .add_tombstone_files(task.expireds.clone())
+            .await?;
+        self.manifest
+            .add_tombstone_files(task.inputs.clone())
+            .await?;
+
+        let (_, results) = TokioScope::scope_and_block(|scope| {
+            for file in task.expireds {
+                let path = Path::from(self.sst_path_gen.generate(file.id()));
+                scope.spawn(async move {
+                    self.store
+                        .delete(&path)
+                        .await
+                        .with_context(|| format!("failed to delete file, 
path:{path}"))
+                });
+            }
+            for file in task.inputs {
+                let path = Path::from(self.sst_path_gen.generate(file.id()));
+                scope.spawn(async move {
+                    self.store
+                        .delete(&path)
+                        .await
+                        .with_context(|| format!("failed to delete file, 
path:{path}"))
+                });
+            }
+        });
+        for res in results {
+            match res {
+                Err(e) => {
+                    error!("Failed to join delete task, err:{e}")
+                }
+                Ok(v) => {
+                    if let Err(e) = v {
+                        error!("Failed to delete sst, err:{e}")
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
diff --git a/src/metric_engine/src/compaction/scheduler.rs 
b/src/metric_engine/src/compaction/scheduler.rs
index 97a16ce2..5e2c407a 100644
--- a/src/metric_engine/src/compaction/scheduler.rs
+++ b/src/metric_engine/src/compaction/scheduler.rs
@@ -21,6 +21,7 @@ use std::{
 };
 
 use anyhow::Context;
+use parquet::file::properties::WriterProperties;
 use tokio::{
     sync::mpsc::{self, Receiver, Sender},
     task::JoinHandle,
@@ -28,11 +29,13 @@ use tokio::{
 };
 use tracing::warn;
 
+use super::runner::Runner;
 use crate::{
     compaction::{picker::TimeWindowCompactionStrategy, Task},
     manifest::ManifestRef,
+    read::ParquetReader,
     sst::SstPathGenerator,
-    types::{ObjectStoreRef, RuntimeRef},
+    types::{ObjectStoreRef, RuntimeRef, StorageSchema},
     Result,
 };
 
@@ -50,8 +53,10 @@ impl Scheduler {
         runtime: RuntimeRef,
         manifest: ManifestRef,
         store: ObjectStoreRef,
+        schema: StorageSchema,
         segment_duration: Duration,
         sst_path_gen: Arc<SstPathGenerator>,
+        parquet_reader: Arc<ParquetReader>,
         config: SchedulerConfig,
     ) -> Self {
         let (task_tx, task_rx) = 
mpsc::channel(config.max_pending_compaction_tasks);
@@ -59,14 +64,18 @@ impl Scheduler {
             let rt = runtime.clone();
             let store = store.clone();
             let manifest = manifest.clone();
+            let write_props = config.write_props.clone();
             runtime.spawn(async move {
                 Self::recv_task_loop(
                     rt,
                     task_rx,
                     store,
+                    schema,
                     manifest,
                     sst_path_gen,
+                    parquet_reader,
                     config.memory_limit,
+                    write_props,
                 )
                 .await;
             })
@@ -99,15 +108,24 @@ impl Scheduler {
         rt: RuntimeRef,
         mut task_rx: Receiver<Task>,
         store: ObjectStoreRef,
+        schema: StorageSchema,
         manifest: ManifestRef,
-        _sst_path_gen: Arc<SstPathGenerator>,
+        sst_path_gen: Arc<SstPathGenerator>,
+        parquet_reader: Arc<ParquetReader>,
         _mem_limit: u64,
+        write_props: WriterProperties,
     ) {
+        let runner = Runner::new(
+            store,
+            schema,
+            manifest,
+            sst_path_gen,
+            parquet_reader,
+            write_props,
+        );
         while let Some(task) = task_rx.recv().await {
-            let store = store.clone();
-            let manifest = manifest.clone();
+            let runner = runner.clone();
             rt.spawn(async move {
-                let runner = Runner { store, manifest };
                 if let Err(e) = runner.do_compaction(task).await {
                     warn!("Do compaction failed, err:{e}");
                 }
@@ -121,8 +139,8 @@ impl Scheduler {
         segment_duration: Duration,
         config: SchedulerConfig,
     ) {
-        let compactor = TimeWindowCompactionStrategy::new(segment_duration, 
config);
         let schedule_interval = config.schedule_interval;
+        let compactor = TimeWindowCompactionStrategy::new(segment_duration, 
config);
         // TODO: obtain expire time
         let expire_time = None;
         loop {
@@ -138,12 +156,13 @@ impl Scheduler {
     }
 }
 
-#[derive(Clone, Copy)]
+#[derive(Clone)]
 pub struct SchedulerConfig {
     pub schedule_interval: Duration,
     pub memory_limit: u64,
     pub max_pending_compaction_tasks: usize,
     pub compaction_files_limit: usize,
+    pub write_props: WriterProperties,
 }
 
 impl Default for SchedulerConfig {
@@ -153,19 +172,7 @@ impl Default for SchedulerConfig {
             memory_limit: bytesize::gb(2_u64),
             max_pending_compaction_tasks: 10,
             compaction_files_limit: 10,
+            write_props: WriterProperties::default(),
         }
     }
 }
-
-pub struct Runner {
-    store: ObjectStoreRef,
-    manifest: ManifestRef,
-}
-
-impl Runner {
-    // TODO: Merge input sst files into one new sst file
-    // and delete the expired sst files
-    async fn do_compaction(&self, _task: Task) -> Result<()> {
-        todo!()
-    }
-}
diff --git a/src/metric_engine/src/manifest.rs 
b/src/metric_engine/src/manifest.rs
index 0cb7f51f..1a50b90e 100644
--- a/src/metric_engine/src/manifest.rs
+++ b/src/metric_engine/src/manifest.rs
@@ -41,6 +41,7 @@ use tokio::{
     },
 };
 use tracing::error;
+use uuid::Uuid;
 
 use crate::{
     ensure,
@@ -52,11 +53,13 @@ use crate::{
 pub const PREFIX_PATH: &str = "manifest";
 pub const SNAPSHOT_FILENAME: &str = "snapshot";
 pub const DELTA_PREFIX: &str = "delta";
+pub const TOMBSTONE_PREFIX: &str = "tombstone";
 
 pub type ManifestRef = Arc<Manifest>;
 
 pub struct Manifest {
     delta_dir: Path,
+    tombstone_dir: Path,
     store: ObjectStoreRef,
     merger: Arc<ManifestMerger>,
 
@@ -126,6 +129,7 @@ impl Manifest {
     ) -> Result<Self> {
         let snapshot_path = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
         let delta_dir = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
+        let tombstone_dir = 
Path::from(format!("{root_dir}/{PREFIX_PATH}/{TOMBSTONE_PREFIX}"));
 
         let merger = ManifestMerger::try_new(
             snapshot_path.clone(),
@@ -150,6 +154,7 @@ impl Manifest {
 
         Ok(Self {
             delta_dir,
+            tombstone_dir,
             store,
             merger,
             payload: RwLock::new(payload),
@@ -187,6 +192,15 @@ impl Manifest {
         Ok(())
     }
 
+    // TODO: recovery tombstone files when startup
+    pub async fn add_tombstone_files<I>(&self, files: I) -> Result<()>
+    where
+        I: IntoIterator<Item = SstFile>,
+    {
+        let path = Path::from(format!("{}/{}", self.tombstone_dir, 
Uuid::new_v4()));
+        todo!()
+    }
+
     // TODO: avoid clone
     pub async fn all_ssts(&self) -> Vec<SstFile> {
         let payload = self.payload.read().await;
diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs
index 7769ff21..1a5abc48 100644
--- a/src/metric_engine/src/read.rs
+++ b/src/metric_engine/src/read.rs
@@ -28,14 +28,25 @@ use arrow::{
 };
 use arrow_schema::SchemaRef;
 use datafusion::{
-    common::internal_err,
-    datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
+    common::{internal_err, DFSchema},
+    datasource::{
+        listing::PartitionedFile,
+        physical_plan::{FileMeta, FileScanConfig, ParquetExec, 
ParquetFileReaderFactory},
+    },
     error::{DataFusionError, Result as DfResult},
-    execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
+    execution::{
+        context::ExecutionProps, object_store::ObjectStoreUrl, 
RecordBatchStream,
+        SendableRecordBatchStream, TaskContext,
+    },
+    logical_expr::utils::conjunction,
     parquet::arrow::async_reader::AsyncFileReader,
+    physical_expr::{create_physical_expr, LexOrdering},
     physical_plan::{
-        metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution, 
ExecutionPlan, PlanProperties,
+        metrics::ExecutionPlanMetricsSet, 
sorts::sort_preserving_merge::SortPreservingMergeExec,
+        DisplayAs, Distribution, ExecutionPlan, PlanProperties,
     },
+    physical_planner::create_physical_sort_exprs,
+    prelude::{ident, Expr},
 };
 use futures::{Stream, StreamExt};
 use itertools::Itertools;
@@ -44,8 +55,9 @@ use tracing::debug;
 
 use crate::{
     compare_primitive_columns,
-    operator::MergeOperator,
-    types::{ObjectStoreRef, SEQ_COLUMN_NAME},
+    operator::{BytesMergeOperator, LastValueOperator, MergeOperator, 
MergeOperatorRef},
+    sst::{SstFile, SstPathGenerator},
+    types::{ObjectStoreRef, StorageSchema, UpdateMode, SEQ_COLUMN_NAME},
     Result,
 };
 
@@ -183,7 +195,7 @@ struct MergeStream {
     stream: SendableRecordBatchStream,
     num_primary_keys: usize,
     seq_idx: usize,
-    value_operator: Arc<dyn MergeOperator>,
+    value_operator: MergeOperatorRef,
 
     pending_batch: Option<RecordBatch>,
     arrow_schema: SchemaRef,
@@ -194,7 +206,7 @@ impl MergeStream {
         stream: SendableRecordBatchStream,
         num_primary_keys: usize,
         seq_idx: usize,
-        value_operator: Arc<dyn MergeOperator>,
+        value_operator: MergeOperatorRef,
     ) -> Self {
         let fields = stream
             .schema()
@@ -356,14 +368,111 @@ impl RecordBatchStream for MergeStream {
     }
 }
 
+pub struct ParquetReader {
+    store: ObjectStoreRef,
+    schema: StorageSchema,
+    sst_path_gen: Arc<SstPathGenerator>,
+}
+
+impl ParquetReader {
+    pub fn new(
+        store: ObjectStoreRef,
+        schema: StorageSchema,
+        sst_path_gen: Arc<SstPathGenerator>,
+    ) -> Self {
+        Self {
+            store,
+            schema,
+            sst_path_gen,
+        }
+    }
+
+    fn build_sort_exprs(&self, df_schema: &DFSchema, sort_seq: bool) -> 
Result<LexOrdering> {
+        let mut sort_exprs = (0..self.schema.num_primary_keys)
+            .map(|i| {
+                ident(self.schema.arrow_schema.field(i).name())
+                    .sort(true /* asc */, true /* nulls_first */)
+            })
+            .collect::<Vec<_>>();
+        if sort_seq {
+            sort_exprs.push(ident(SEQ_COLUMN_NAME).sort(true, true));
+        }
+        let sort_exprs =
+            create_physical_sort_exprs(&sort_exprs, df_schema, 
&ExecutionProps::default())
+                .context("create physical sort exprs")?;
+
+        Ok(sort_exprs)
+    }
+
+    pub fn build_df_plan(
+        &self,
+        ssts: Vec<SstFile>,
+        projections: Option<Vec<usize>>,
+        predicates: Vec<Expr>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // we won't use url for selecting object_store.
+        let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
+        let df_schema =
+            
DFSchema::try_from(self.schema.arrow_schema.clone()).context("build DFSchema")?;
+        let sort_exprs = self.build_sort_exprs(&df_schema, true /* sort_seq 
*/)?;
+
+        let file_groups = ssts
+            .into_iter()
+            .map(|f| {
+                vec![PartitionedFile::new(
+                    self.sst_path_gen.generate(f.id()),
+                    f.meta().size as u64,
+                )]
+            })
+            .collect::<Vec<_>>();
+        let scan_config = FileScanConfig::new(dummy_url, 
self.schema.arrow_schema.clone())
+            .with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
+            .with_file_groups(file_groups)
+            .with_projection(projections);
+
+        let mut builder = 
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
+            Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
+        );
+        if let Some(expr) = conjunction(predicates) {
+            let filters = create_physical_expr(&expr, &df_schema, 
&ExecutionProps::new())
+                .context("create physical expr")?;
+            builder = builder.with_predicate(filters);
+        }
+
+        // TODO: fetch using multiple threads since read from parquet will 
incur CPU
+        // when convert between arrow and parquet.
+        let parquet_exec = builder.build();
+        let sort_exec = SortPreservingMergeExec::new(sort_exprs, 
Arc::new(parquet_exec))
+            // TODO: make fetch size configurable.
+            .with_fetch(Some(1024))
+            .with_round_robin_repartition(true);
+
+        let merge_exec = MergeExec::new(
+            Arc::new(sort_exec),
+            self.schema.num_primary_keys,
+            self.schema.seq_idx,
+            match self.schema.update_mode {
+                UpdateMode::Overwrite => Arc::new(LastValueOperator),
+                UpdateMode::Append => {
+                    
Arc::new(BytesMergeOperator::new(self.schema.value_idxes.clone()))
+                }
+            },
+        );
+        Ok(Arc::new(merge_exec))
+    }
+}
+
 #[cfg(test)]
 mod tests {
+    use object_store::local::LocalFileSystem;
     use test_log::test;
 
     use super::*;
     use crate::{
+        arrow_schema,
         operator::{BytesMergeOperator, LastValueOperator, MergeOperatorRef},
         record_batch,
+        sst::FileMeta,
         test_util::{check_stream, make_sendable_record_batches},
     };
 
@@ -379,7 +488,7 @@ mod tests {
             record_batch!(("pk1", UInt8, vec![14]), ("value", Binary, 
vec![b"9"])).unwrap(),
         ];
 
-        test_merge_stream_for_append_mode(Arc::new(LastValueOperator), 
expected).await;
+        test_merge_stream_inner(Arc::new(LastValueOperator), expected).await;
 
         let expected = [
             record_batch!(
@@ -391,11 +500,10 @@ mod tests {
             record_batch!(("pk1", UInt8, vec![14]), ("value", Binary, 
vec![b"9"])).unwrap(),
         ];
 
-        
test_merge_stream_for_append_mode(Arc::new(BytesMergeOperator::new(vec![1])), 
expected)
-            .await;
+        test_merge_stream_inner(Arc::new(BytesMergeOperator::new(vec![1])), 
expected).await;
     }
 
-    async fn test_merge_stream_for_append_mode<I>(merge_op: MergeOperatorRef, 
expected: I)
+    async fn test_merge_stream_inner<I>(merge_op: MergeOperatorRef, expected: 
I)
     where
         I: IntoIterator<Item = RecordBatch>,
     {
@@ -423,4 +531,50 @@ mod tests {
         let stream = MergeStream::new(stream, 1, 2, merge_op);
         check_stream(Box::pin(stream), expected).await;
     }
+
+    #[tokio::test]
+    async fn test_build_scan_plan() {
+        let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8), 
(SEQ_COLUMN_NAME, UInt64));
+        let store = Arc::new(LocalFileSystem::new());
+        let reader = ParquetReader::new(
+            store,
+            StorageSchema {
+                arrow_schema: schema.clone(),
+                num_primary_keys: 1,
+                seq_idx: 2,
+                value_idxes: vec![1],
+                update_mode: UpdateMode::Overwrite,
+            },
+            Arc::new(SstPathGenerator::new("mock".to_string())),
+        );
+        let plan = reader
+            .build_df_plan(
+                (100..103)
+                    .map(|id| {
+                        SstFile::new(
+                            id,
+                            FileMeta {
+                                max_sequence: id,
+                                num_rows: 1,
+                                size: 1,
+                                time_range: (1..10).into(),
+                            },
+                        )
+                    })
+                    .collect(),
+                None,
+                vec![],
+            )
+            .unwrap();
+        let display_plan =
+            
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
+                .indent(true);
+        assert_eq!(
+            r#"MergeExec: [primary_keys: 1, seq_idx: 2]
+  SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024
+    ParquetExec: file_groups={3 groups: [[mock/data/100.sst], 
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], 
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], 
[pk1@0 ASC, __seq__@2 ASC]]
+"#,
+            format!("{display_plan}")
+        );
+    }
 }
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index 0aa25361..37cc904e 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -25,20 +25,14 @@ use arrow::{
 use arrow_schema::{DataType, Field, Schema};
 use async_trait::async_trait;
 use datafusion::{
+    self,
     common::DFSchema,
-    datasource::{
-        listing::PartitionedFile,
-        physical_plan::{FileScanConfig, ParquetExec},
-    },
-    execution::{context::ExecutionProps, object_store::ObjectStoreUrl, 
SendableRecordBatchStream},
-    logical_expr::{utils::conjunction, Expr},
-    physical_expr::{create_physical_expr, LexOrdering},
+    execution::{context::ExecutionProps, SendableRecordBatchStream},
+    logical_expr::Expr,
+    physical_expr::LexOrdering,
     physical_plan::{
-        execute_stream,
-        memory::MemoryExec,
-        sorts::{sort::SortExec, 
sort_preserving_merge::SortPreservingMergeExec},
-        union::UnionExec,
-        EmptyRecordBatchStream, ExecutionPlan,
+        execute_stream, memory::MemoryExec, sorts::sort::SortExec, 
union::UnionExec,
+        EmptyRecordBatchStream,
     },
     physical_planner::create_physical_sort_exprs,
     prelude::{ident, SessionContext},
@@ -58,11 +52,10 @@ use crate::{
     compaction::{CompactionScheduler, SchedulerConfig},
     ensure,
     manifest::{Manifest, ManifestRef},
-    operator::{BytesMergeOperator, LastValueOperator},
-    read::{DefaultParquetFileReaderFactory, MergeExec},
-    sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
+    read::ParquetReader,
+    sst::{allocate_id, FileMeta, SstPathGenerator},
     types::{
-        ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, UpdateMode, 
WriteOptions,
+        ObjectStoreRef, RuntimeOptions, StorageOptions, StorageSchema, 
TimeRange, WriteOptions,
         WriteResult, SEQ_COLUMN_NAME,
     },
     Result,
@@ -136,15 +129,10 @@ pub struct CloudObjectStorage {
     segment_duration: Duration,
     path: String,
     store: ObjectStoreRef,
-    arrow_schema: SchemaRef,
-    num_primary_keys: usize,
-    seq_idx: usize,
-    value_idxes: Vec<usize>,
-    update_mode: UpdateMode,
+    schema: StorageSchema,
     manifest: ManifestRef,
     runtimes: StorageRuntimes,
-
-    df_schema: DFSchema,
+    parquet_reader: Arc<ParquetReader>,
     write_props: WriterProperties,
     sst_path_gen: Arc<SstPathGenerator>,
     compact_scheduler: CompactionScheduler,
@@ -170,9 +158,6 @@ impl CloudObjectStorage {
         num_primary_keys: usize,
         storage_opts: StorageOptions,
     ) -> Result<Self> {
-        let value_idxes = 
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
-        ensure!(!value_idxes.is_empty(), "no value column found");
-
         let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
         let manifest = Arc::new(
             Manifest::try_new(
@@ -183,42 +168,59 @@ impl CloudObjectStorage {
             )
             .await?,
         );
-
-        let mut new_fields = arrow_schema.fields.clone().to_vec();
-        new_fields.push(Arc::new(Field::new(
-            SEQ_COLUMN_NAME,
-            DataType::UInt64,
-            true,
-        )));
-        let seq_idx = new_fields.len() - 1;
-        let arrow_schema = Arc::new(Schema::new_with_metadata(
-            new_fields,
-            arrow_schema.metadata.clone(),
-        ));
-        let df_schema = 
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
+        let schema = {
+            let value_idxes = 
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
+            ensure!(!value_idxes.is_empty(), "no value column found");
+
+            let mut new_fields = arrow_schema.fields.clone().to_vec();
+            new_fields.push(Arc::new(Field::new(
+                SEQ_COLUMN_NAME,
+                DataType::UInt64,
+                true,
+            )));
+            let seq_idx = new_fields.len() - 1;
+            let arrow_schema = Arc::new(Schema::new_with_metadata(
+                new_fields,
+                arrow_schema.metadata.clone(),
+            ));
+            let update_mode = storage_opts.update_mode;
+            StorageSchema {
+                arrow_schema,
+                num_primary_keys,
+                seq_idx,
+                value_idxes,
+                update_mode,
+            }
+        };
         let write_props = Self::build_write_props(storage_opts.write_opts, 
num_primary_keys);
         let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone()));
+        let parquet_reader = Arc::new(ParquetReader::new(
+            store.clone(),
+            schema.clone(),
+            sst_path_gen.clone(),
+        ));
+
         let compact_scheduler = CompactionScheduler::new(
             runtimes.sst_compact_runtime.clone(),
             manifest.clone(),
             store.clone(),
+            schema.clone(),
             segment_duration,
             sst_path_gen.clone(),
-            SchedulerConfig::default(),
+            parquet_reader.clone(),
+            SchedulerConfig {
+                write_props: write_props.clone(),
+                ..Default::default()
+            },
         );
-        let update_mode = storage_opts.update_mode;
         Ok(Self {
             path,
-            num_primary_keys,
-            seq_idx,
-            value_idxes,
-            update_mode,
+            schema,
             segment_duration,
             store,
-            arrow_schema,
             manifest,
+            parquet_reader,
             runtimes,
-            df_schema,
             write_props,
             sst_path_gen,
             compact_scheduler,
@@ -246,7 +248,7 @@ impl CloudObjectStorage {
                 // Since file_id in increasing order, we can use it as 
sequence.
                 let seq_column = Arc::new(UInt64Array::from(vec![file_id; 
batch.num_rows()]));
                 new_cols.push(seq_column);
-                RecordBatch::try_new(self.arrow_schema.clone(), new_cols)
+                RecordBatch::try_new(self.schema.arrow_schema.clone(), 
new_cols)
                     .context("construct record batch with seq column")?
             };
             writer
@@ -268,8 +270,8 @@ impl CloudObjectStorage {
         })
     }
 
-    fn build_sort_exprs(&self, sort_seq: bool) -> Result<LexOrdering> {
-        let mut sort_exprs = (0..self.num_primary_keys)
+    fn build_sort_exprs(&self, df_schema: &DFSchema, sort_seq: bool) -> 
Result<LexOrdering> {
+        let mut sort_exprs = (0..self.schema.num_primary_keys)
             .map(|i| {
                 ident(self.schema().field(i).name())
                     .sort(true /* asc */, true /* nulls_first */)
@@ -279,7 +281,7 @@ impl CloudObjectStorage {
             sort_exprs.push(ident(SEQ_COLUMN_NAME).sort(true, true));
         }
         let sort_exprs =
-            create_physical_sort_exprs(&sort_exprs, &self.df_schema, 
&ExecutionProps::default())
+            create_physical_sort_exprs(&sort_exprs, df_schema, 
&ExecutionProps::default())
                 .context("create physical sort exprs")?;
 
         Ok(sort_exprs)
@@ -288,7 +290,8 @@ impl CloudObjectStorage {
     async fn sort_batch(&self, batch: RecordBatch) -> 
Result<SendableRecordBatchStream> {
         let ctx = SessionContext::default();
         let schema = batch.schema();
-        let sort_exprs = self.build_sort_exprs(false /* sort_seq */)?;
+        let df_schema = 
DFSchema::try_from(self.schema().clone()).context("build DFSchema")?;
+        let sort_exprs = self.build_sort_exprs(&df_schema, false /* sort_seq 
*/)?;
         let batch_plan =
             MemoryExec::try_new(&[vec![batch]], schema, None).context("build 
batch plan")?;
         let physical_plan = Arc::new(SortExec::new(sort_exprs, 
Arc::new(batch_plan)));
@@ -339,65 +342,12 @@ impl CloudObjectStorage {
 
         builder.build()
     }
-
-    fn build_scan_plan(
-        &self,
-        ssts: Vec<SstFile>,
-        projections: Option<Vec<usize>>,
-        predicates: Vec<Expr>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        // we won't use url for selecting object_store.
-        let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
-        let sort_exprs = self.build_sort_exprs(true /* sort_seq */)?;
-
-        let file_groups = ssts
-            .into_iter()
-            .map(|f| {
-                vec![PartitionedFile::new(
-                    self.sst_path_gen.generate(f.id()),
-                    f.meta().size as u64,
-                )]
-            })
-            .collect::<Vec<_>>();
-        let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
-            .with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
-            .with_file_groups(file_groups)
-            .with_projection(projections);
-
-        let mut builder = 
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
-            Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
-        );
-        if let Some(expr) = conjunction(predicates) {
-            let filters = create_physical_expr(&expr, &self.df_schema, 
&ExecutionProps::new())
-                .context("create physical expr")?;
-            builder = builder.with_predicate(filters);
-        }
-
-        // TODO: fetch using multiple threads since read from parquet will 
incur CPU
-        // when convert between arrow and parquet.
-        let parquet_exec = builder.build();
-        let sort_exec = SortPreservingMergeExec::new(sort_exprs, 
Arc::new(parquet_exec))
-            // TODO: make fetch size configurable.
-            .with_fetch(Some(1024))
-            .with_round_robin_repartition(true);
-
-        let merge_exec = MergeExec::new(
-            Arc::new(sort_exec),
-            self.num_primary_keys,
-            self.seq_idx,
-            match self.update_mode {
-                UpdateMode::Overwrite => Arc::new(LastValueOperator),
-                UpdateMode::Append => 
Arc::new(BytesMergeOperator::new(self.value_idxes.clone())),
-            },
-        );
-        Ok(Arc::new(merge_exec))
-    }
 }
 
 #[async_trait]
 impl TimeMergeStorage for CloudObjectStorage {
     fn schema(&self) -> &SchemaRef {
-        &self.arrow_schema
+        &self.schema.arrow_schema
     }
 
     async fn write(&self, req: WriteRequest) -> Result<()> {
@@ -432,7 +382,7 @@ impl TimeMergeStorage for CloudObjectStorage {
         let total_ssts = self.manifest.find_ssts(&req.range).await;
         if total_ssts.is_empty() {
             return Ok(Box::pin(EmptyRecordBatchStream::new(
-                self.arrow_schema.clone(),
+                self.schema.arrow_schema.clone(),
             )));
         }
 
@@ -442,8 +392,11 @@ impl TimeMergeStorage for CloudObjectStorage {
 
         let mut plan_for_all_segments = Vec::new();
         for (_, ssts) in ssts_by_segment.sorted_by(|a, b| a.0.cmp(&b.0)) {
-            let plan =
-                self.build_scan_plan(ssts, req.projections.clone(), 
req.predicate.clone())?;
+            let plan = self.parquet_reader.build_df_plan(
+                ssts,
+                req.projections.clone(),
+                req.predicate.clone(),
+            )?;
 
             plan_for_all_segments.push(plan);
         }
@@ -473,51 +426,6 @@ mod tests {
     use super::*;
     use crate::{arrow_schema, record_batch, test_util::check_stream, 
types::Timestamp};
 
-    #[tokio::test]
-    async fn test_build_scan_plan() {
-        let schema = arrow_schema!(("pk1", UInt8), ("value", UInt8));
-        let store = Arc::new(LocalFileSystem::new());
-        let storage = CloudObjectStorage::try_new(
-            "mock".to_string(),
-            Duration::from_hours(2),
-            store,
-            schema.clone(),
-            1, // num_primary_keys
-            StorageOptions::default(),
-        )
-        .await
-        .unwrap();
-        let plan = storage
-            .build_scan_plan(
-                (100..103)
-                    .map(|id| {
-                        SstFile::new(
-                            id,
-                            FileMeta {
-                                max_sequence: id,
-                                num_rows: 1,
-                                size: 1,
-                                time_range: (1..10).into(),
-                            },
-                        )
-                    })
-                    .collect(),
-                None,
-                vec![],
-            )
-            .unwrap();
-        let display_plan =
-            
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
-                .indent(true);
-        assert_eq!(
-            r#"MergeExec: [primary_keys: 1, seq_idx: 2]
-  SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024
-    ParquetExec: file_groups={3 groups: [[mock/data/100.sst], 
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], 
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], 
[pk1@0 ASC, __seq__@2 ASC]]
-"#,
-            format!("{display_plan}")
-        );
-    }
-
     #[test(tokio::test)]
     async fn test_storage_write_and_scan() {
         let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", 
Int64));
diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs
index c4972fe3..03d75ed4 100644
--- a/src/metric_engine/src/types.rs
+++ b/src/metric_engine/src/types.rs
@@ -22,6 +22,7 @@ use std::{
     time::Duration,
 };
 
+use arrow_schema::SchemaRef;
 use object_store::ObjectStore;
 use parquet::basic::{Compression, Encoding, ZstdLevel};
 use tokio::runtime::Runtime;
@@ -111,6 +112,11 @@ impl TimeRange {
     pub fn overlaps(&self, other: &TimeRange) -> bool {
         self.0.start < other.0.end && other.0.start < self.0.end
     }
+
+    pub fn merge(&mut self, other: &TimeRange) {
+        self.0.start = self.0.start.min(other.0.start);
+        self.0.end = self.0.end.max(other.0.end);
+    }
 }
 
 pub type ObjectStoreRef = Arc<dyn ObjectStore>;
@@ -194,7 +200,7 @@ impl Default for ManifestMergeOptions {
     }
 }
 
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
 pub enum UpdateMode {
     #[default]
     Overwrite,
@@ -209,6 +215,15 @@ pub struct StorageOptions {
     pub update_mode: UpdateMode,
 }
 
+#[derive(Debug, Clone)]
+pub struct StorageSchema {
+    pub arrow_schema: SchemaRef,
+    pub num_primary_keys: usize,
+    pub seq_idx: usize,
+    pub value_idxes: Vec<usize>,
+    pub update_mode: UpdateMode,
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to