This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 5d7c6a7f fix: scan with predicate (#1617)
5d7c6a7f is described below
commit 5d7c6a7ff651a84e08efdb70ce8df496c9b86dcb
Author: 鲍金日 <[email protected]>
AuthorDate: Mon Dec 23 17:23:57 2024 +0800
fix: scan with predicate (#1617)
## Rationale
- support scan with predicate
## Detailed Changes
## Test Plan
CI
---
Cargo.lock | 4 +--
src/metric_engine/src/compaction/scheduler.rs | 1 -
src/metric_engine/src/read.rs | 40 +++++++++++++++++++--------
src/metric_engine/src/storage.rs | 27 ++++++++++++++++++
4 files changed, 57 insertions(+), 15 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 8cace345..85001036 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2637,7 +2637,7 @@ checksum =
"0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck",
- "itertools 0.13.0",
+ "itertools 0.10.5",
"log",
"multimap",
"once_cell",
@@ -2657,7 +2657,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
dependencies = [
"anyhow",
- "itertools 0.13.0",
+ "itertools 0.10.5",
"proc-macro2",
"quote",
"syn",
diff --git a/src/metric_engine/src/compaction/scheduler.rs
b/src/metric_engine/src/compaction/scheduler.rs
index d947096e..581d397b 100644
--- a/src/metric_engine/src/compaction/scheduler.rs
+++ b/src/metric_engine/src/compaction/scheduler.rs
@@ -64,7 +64,6 @@ impl Scheduler {
let task_handle = {
let store = store.clone();
let manifest = manifest.clone();
- let trigger_tx = trigger_tx.clone();
let executor = Executor::new(
runtime.clone(),
store,
diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs
index 1dea5c6c..0d274337 100644
--- a/src/metric_engine/src/read.rs
+++ b/src/metric_engine/src/read.rs
@@ -42,8 +42,9 @@ use datafusion::{
parquet::arrow::async_reader::AsyncFileReader,
physical_expr::{create_physical_expr, LexOrdering},
physical_plan::{
- metrics::ExecutionPlanMetricsSet,
sorts::sort_preserving_merge::SortPreservingMergeExec,
- DisplayAs, Distribution, ExecutionPlan, PlanProperties,
+ filter::FilterExec, metrics::ExecutionPlanMetricsSet,
+ sorts::sort_preserving_merge::SortPreservingMergeExec, DisplayAs,
Distribution,
+ ExecutionPlan, PlanProperties,
},
physical_planner::create_physical_sort_exprs,
prelude::{ident, Expr},
@@ -430,17 +431,28 @@ impl ParquetReader {
let mut builder =
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
);
- if let Some(expr) = conjunction(predicates) {
- let filters = create_physical_expr(&expr, &df_schema,
&ExecutionProps::new())
- .context("create physical expr")?;
- builder = builder.with_predicate(filters);
- }
+ let base_plan: Arc<dyn ExecutionPlan> = match conjunction(predicates) {
+ Some(expr) => {
+ let filters = create_physical_expr(&expr, &df_schema,
&ExecutionProps::new())
+ .context("create physical expr")?;
+
+ builder = builder.with_predicate(filters.clone());
+ let parquet_exec = builder.build();
+
+ let filter_exec = FilterExec::try_new(filters,
Arc::new(parquet_exec))
+ .context("create filter exec")?;
+ Arc::new(filter_exec)
+ }
+ None => {
+ let parquet_exec = builder.build();
+ Arc::new(parquet_exec)
+ }
+ };
// TODO: fetch using multiple threads since read from parquet will
incur CPU
// when convert between arrow and parquet.
- let parquet_exec = builder.build();
- let sort_exec = SortPreservingMergeExec::new(sort_exprs,
Arc::new(parquet_exec))
- .with_round_robin_repartition(true);
+ let sort_exec =
+ SortPreservingMergeExec::new(sort_exprs,
base_plan).with_round_robin_repartition(true);
let merge_exec = MergeExec::new(
Arc::new(sort_exec),
@@ -459,6 +471,7 @@ impl ParquetReader {
#[cfg(test)]
mod tests {
+ use datafusion::logical_expr::{col, lit};
use object_store::local::LocalFileSystem;
use test_log::test;
@@ -542,6 +555,8 @@ mod tests {
},
Arc::new(SstPathGenerator::new("mock".to_string())),
);
+
+ let expr = col("pk1").eq(lit(0_u8));
let plan = reader
.build_df_plan(
(100..103)
@@ -558,7 +573,7 @@ mod tests {
})
.collect(),
None,
- vec![],
+ vec![expr],
)
.unwrap();
let display_plan =
@@ -567,7 +582,8 @@ mod tests {
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 2]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
- ParquetExec: file_groups={3 groups: [[mock/data/100.sst],
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__],
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC],
[pk1@0 ASC, __seq__@2 ASC]]
+ FilterExec: pk1@0 = 0
+ ParquetExec: file_groups={3 groups: [[mock/data/100.sst],
[mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__],
output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC],
[pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN
pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <=
pk1_max@1 END, required_guarantees=[pk1 in (0)]
"#,
format!("{display_plan}")
);
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index b86ad803..2d5b3a6b 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -404,6 +404,7 @@ impl TimeMergeStorage for CloudObjectStorage {
#[cfg(test)]
mod tests {
+ use datafusion::logical_expr::{col, lit};
use object_store::local::LocalFileSystem;
use test_log::test;
@@ -488,6 +489,32 @@ mod tests {
];
check_stream(result_stream, expected_batch).await;
+
+ // test with predicate
+ let expr = col("pk1").eq(lit(11_u8));
+ let result_stream = storage
+ .scan(ScanRequest {
+ range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+ predicate: vec![expr],
+ projections: None,
+ })
+ .await
+ .unwrap();
+ let expected_batch = [
+ record_batch!(
+ ("pk1", UInt8, vec![11]),
+ ("pk2", UInt8, vec![99]),
+ ("value", Int64, vec![77])
+ )
+ .unwrap(),
+ record_batch!(
+ ("pk1", UInt8, vec![11]),
+ ("pk2", UInt8, vec![100]),
+ ("value", Int64, vec![22])
+ )
+ .unwrap(),
+ ];
+ check_stream(result_stream, expected_batch).await;
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]