This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 03fbf9fec refactor: ParquetExec logical expr. => phys. expr. (#5419)
03fbf9fec is described below
commit 03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Mar 1 17:24:55 2023 +0100
refactor: ParquetExec logical expr. => phys. expr. (#5419)
* feat: `get_phys_expr_columns` util
* feat: add `reassign_predicate_columns` util
* feat: `PhysicalExprRef` type alias
* refactor: `ParquetExec` logical expr. => phys. expr.
Use `Arc<dyn PhysicalExpr>` instead of `Expr` within `ParquetExec` and
move lowering from logical to physical expression into plan lowering
(e.g. `ListingTable`).
This is in line w/ all other physical plan nodes (e.g. `FilterExpr`) and
simplifies reasoning within physical optimizer but also allows correct
passing of `ExecutionProps` into the conversion.
Closes #4695.
---
datafusion/core/src/datasource/file_format/avro.rs | 4 +-
datafusion/core/src/datasource/file_format/csv.rs | 4 +-
datafusion/core/src/datasource/file_format/json.rs | 4 +-
datafusion/core/src/datasource/file_format/mod.rs | 6 +-
.../core/src/datasource/file_format/parquet.rs | 14 +-
datafusion/core/src/datasource/listing/table.rs | 20 +-
datafusion/core/src/physical_optimizer/pruning.rs | 955 +++++++++++++--------
.../core/src/physical_plan/file_format/parquet.rs | 34 +-
.../file_format/parquet/page_filter.rs | 21 +-
.../file_format/parquet/row_filter.rs | 101 +--
.../file_format/parquet/row_groups.rs | 68 +-
datafusion/core/tests/parquet/page_pruning.rs | 11 +-
datafusion/core/tests/row.rs | 2 +-
datafusion/physical-expr/src/lib.rs | 2 +-
datafusion/physical-expr/src/physical_expr.rs | 3 +
datafusion/physical-expr/src/utils.rs | 78 ++
datafusion/proto/proto/datafusion.proto | 6 +-
datafusion/proto/src/generated/pbjson.rs | 25 +-
datafusion/proto/src/generated/prost.rs | 4 +-
datafusion/proto/src/physical_plan/mod.rs | 36 +-
parquet-test-utils/src/lib.rs | 6 +-
21 files changed, 907 insertions(+), 497 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/avro.rs
b/datafusion/core/src/datasource/file_format/avro.rs
index 1b6d2b3bc..422733308 100644
--- a/datafusion/core/src/datasource/file_format/avro.rs
+++ b/datafusion/core/src/datasource/file_format/avro.rs
@@ -23,13 +23,13 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
+use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::logical_expr::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
@@ -82,7 +82,7 @@ impl FileFormat for AvroFormat {
&self,
_state: &SessionState,
conf: FileScanConfig,
- _filters: &[Expr],
+ _filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(conf);
Ok(Arc::new(exec))
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 85a9d186a..fcab651e3 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -29,6 +29,7 @@ use bytes::{Buf, Bytes};
use datafusion_common::DataFusionError;
+use datafusion_physical_expr::PhysicalExpr;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
@@ -37,7 +38,6 @@ use
crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::logical_expr::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
@@ -154,7 +154,7 @@ impl FileFormat for CsvFormat {
&self,
_state: &SessionState,
conf: FileScanConfig,
- _filters: &[Expr],
+ _filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = CsvExec::new(
conf,
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 7b0b0e18d..a66edab88 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -29,6 +29,7 @@ use arrow::json::reader::ValueIter;
use async_trait::async_trait;
use bytes::Buf;
+use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use super::FileFormat;
@@ -37,7 +38,6 @@ use
crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::logical_expr::Expr;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
@@ -143,7 +143,7 @@ impl FileFormat for JsonFormat {
&self,
_state: &SessionState,
conf: FileScanConfig,
- _filters: &[Expr],
+ _filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = NdJsonExec::new(conf,
self.file_compression_type.to_owned());
Ok(Arc::new(exec))
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 947327630..52da7285e 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -33,12 +33,12 @@ use std::sync::Arc;
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
-use crate::logical_expr::Expr;
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};
use crate::execution::context::SessionState;
use async_trait::async_trait;
+use datafusion_physical_expr::PhysicalExpr;
use object_store::{ObjectMeta, ObjectStore};
/// This trait abstracts all the file format specific implementations
@@ -84,7 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
&self,
state: &SessionState,
conf: FileScanConfig,
- filters: &[Expr],
+ filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>>;
}
@@ -143,7 +143,7 @@ pub(crate) mod test_util {
output_ordering: None,
infinite_source: false,
},
- &[],
+ None,
)
.await?;
Ok(exec)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 0a7a7cadc..53e94167d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
-use datafusion_optimizer::utils::conjunction;
+use datafusion_physical_expr::PhysicalExpr;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::parquet_to_arrow_schema;
@@ -44,7 +44,6 @@ use crate::config::ConfigOptions;
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::logical_expr::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
@@ -189,16 +188,15 @@ impl FileFormat for ParquetFormat {
&self,
state: &SessionState,
conf: FileScanConfig,
- filters: &[Expr],
+ filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
- let predicate = if self.enable_pruning(state.config_options()) {
- conjunction(filters.to_vec())
- } else {
- None
- };
+ let predicate = self
+ .enable_pruning(state.config_options())
+ .then(|| filters.cloned())
+ .flatten();
Ok(Arc::new(ParquetExec::new(
conf,
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 29e2259e4..f6d9c959e 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -24,8 +24,10 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use dashmap::DashMap;
+use datafusion_common::ToDFSchema;
use datafusion_expr::expr::Sort;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_optimizer::utils::conjunction;
+use datafusion_physical_expr::{create_physical_expr, PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -661,6 +663,20 @@ impl TableProvider for ListingTable {
})
.collect::<Result<Vec<_>>>()?;
+ let filters = if let Some(expr) = conjunction(filters.to_vec()) {
+ // NOTE: Use the table schema (NOT file schema) here because
`expr` may contain references to partition columns.
+ let table_df_schema =
self.table_schema.as_ref().clone().to_dfschema()?;
+ let filters = create_physical_expr(
+ &expr,
+ &table_df_schema,
+ &self.table_schema,
+ state.execution_props(),
+ )?;
+ Some(filters)
+ } else {
+ None
+ };
+
// create the execution plan
self.options
.format
@@ -677,7 +693,7 @@ impl TableProvider for ListingTable {
table_partition_cols,
infinite_source: self.infinite_source,
},
- filters,
+ filters.as_ref(),
)
.await
}
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 03e376878..fbf000148 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -32,14 +32,13 @@ use std::collections::HashSet;
use std::convert::TryFrom;
use std::sync::Arc;
-use crate::execution::context::ExecutionProps;
-use crate::prelude::lit;
use crate::{
common::{Column, DFSchema},
error::{DataFusionError, Result},
- logical_expr::{Expr, Operator},
+ logical_expr::Operator,
physical_plan::{ColumnarValue, PhysicalExpr},
};
+use arrow::compute::DEFAULT_CAST_OPTIONS;
use arrow::record_batch::RecordBatchOptions;
use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
@@ -47,11 +46,10 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::{downcast_value, ScalarValue};
-use datafusion_expr::expr::{BinaryExpr, Cast, TryCast};
-use datafusion_expr::expr_rewriter::rewrite_expr;
-use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
-use datafusion_physical_expr::create_physical_expr;
-use datafusion_physical_expr::expressions::Literal;
+use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
+
+use datafusion_physical_expr::rewrite::{TreeNodeRewritable, TreeNodeRewriter};
+use datafusion_physical_expr::utils::get_phys_expr_columns;
use log::trace;
/// Interface to pass statistics information to [`PruningPredicate`]
@@ -104,8 +102,8 @@ pub struct PruningPredicate {
predicate_expr: Arc<dyn PhysicalExpr>,
/// The statistics required to evaluate this predicate
required_columns: RequiredStatColumns,
- /// Logical predicate from which this predicate expr is derived (required
for serialization)
- logical_expr: Expr,
+ /// Original physical predicate from which this predicate expr is derived
(required for serialization)
+ orig_expr: Arc<dyn PhysicalExpr>,
}
impl PruningPredicate {
@@ -128,31 +126,16 @@ impl PruningPredicate {
/// For example, the filter expression `(column / 2) = 4` becomes
/// the pruning predicate
/// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
- pub fn try_new(expr: Expr, schema: SchemaRef) -> Result<Self> {
+ pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) ->
Result<Self> {
// build predicate expression once
let mut required_columns = RequiredStatColumns::new();
- let logical_predicate_expr =
+ let predicate_expr =
build_predicate_expression(&expr, schema.as_ref(), &mut
required_columns);
- let stat_fields = required_columns
- .iter()
- .map(|(_, _, f)| f.clone())
- .collect::<Vec<_>>();
- let stat_schema = Schema::new(stat_fields);
- let stat_dfschema = DFSchema::try_from(stat_schema.clone())?;
-
- // TODO allow these properties to be passed in
- let execution_props = ExecutionProps::new();
- let predicate_expr = create_physical_expr(
- &logical_predicate_expr,
- &stat_dfschema,
- &stat_schema,
- &execution_props,
- )?;
Ok(Self {
schema,
predicate_expr,
required_columns,
- logical_expr: expr,
+ orig_expr: expr,
})
}
@@ -215,9 +198,9 @@ impl PruningPredicate {
&self.schema
}
- /// Returns a reference to the logical expr used to construct this pruning
predicate
- pub fn logical_expr(&self) -> &Expr {
- &self.logical_expr
+ /// Returns a reference to the physical expr used to construct this
pruning predicate
+ pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
+ &self.orig_expr
}
/// Returns a reference to the predicate expr
@@ -227,11 +210,7 @@ impl PruningPredicate {
/// Returns true if this pruning predicate is "always true" (aka will not
prune anything)
pub fn allways_true(&self) -> bool {
- self.predicate_expr
- .as_any()
- .downcast_ref::<Literal>()
- .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
- .unwrap_or_default()
+ is_always_true(&self.predicate_expr)
}
pub(crate) fn required_columns(&self) -> &RequiredStatColumns {
@@ -239,6 +218,13 @@ impl PruningPredicate {
}
}
+fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
+ expr.as_any()
+ .downcast_ref::<phys_expr::Literal>()
+ .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
+ .unwrap_or_default()
+}
+
/// Records for which columns statistics are necessary to evaluate a
/// pruning predicate.
///
@@ -251,7 +237,7 @@ pub(crate) struct RequiredStatColumns {
/// * Statistics type (e.g. Min or Max or Null_Count)
/// * The field the statistics value should be placed in for
/// pruning predicate evaluation
- columns: Vec<(Column, StatisticsType, Field)>,
+ columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
}
impl RequiredStatColumns {
@@ -269,19 +255,22 @@ impl RequiredStatColumns {
/// Returns an iterator over items in columns (see doc on
/// `self.columns` for details)
- pub(crate) fn iter(&self) -> impl Iterator<Item = &(Column,
StatisticsType, Field)> {
+ pub(crate) fn iter(
+ &self,
+ ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
self.columns.iter()
}
- fn is_stat_column_missing(
+ fn find_stat_column(
&self,
- column: &Column,
+ column: &phys_expr::Column,
statistics_type: StatisticsType,
- ) -> bool {
- !self
- .columns
+ ) -> Option<usize> {
+ self.columns
.iter()
- .any(|(c, t, _f)| c == column && t == &statistics_type)
+ .enumerate()
+ .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
+ .map(|(i, (_c, _t, _f))| i)
}
/// Rewrites column_expr so that all appearances of column
@@ -294,25 +283,27 @@ impl RequiredStatColumns {
/// 5` with the appropriate entry noted in self.columns
fn stat_column_expr(
&mut self,
- column: &Column,
- column_expr: &Expr,
+ column: &phys_expr::Column,
+ column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
stat_type: StatisticsType,
suffix: &str,
- ) -> Result<Expr> {
- let stat_column = Column {
- relation: column.relation.clone(),
- name: format!("{}_{}", column.flat_name(), suffix),
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ let (idx, need_to_insert) = match self.find_stat_column(column,
stat_type) {
+ Some(idx) => (idx, false),
+ None => (self.columns.len(), true),
};
- let stat_field = Field::new(
- stat_column.flat_name().as_str(),
- field.data_type().clone(),
- field.is_nullable(),
- );
+ let stat_column =
+ phys_expr::Column::new(&format!("{}_{}", column.name(), suffix),
idx);
- if self.is_stat_column_missing(column, stat_type) {
- // only add statistics column if not previously added
+ // only add statistics column if not previously added
+ if need_to_insert {
+ let stat_field = Field::new(
+ stat_column.name(),
+ field.data_type().clone(),
+ field.is_nullable(),
+ );
self.columns.push((column.clone(), stat_type, stat_field));
}
rewrite_column_expr(column_expr.clone(), column, &stat_column)
@@ -321,30 +312,30 @@ impl RequiredStatColumns {
/// rewrite col --> col_min
fn min_column_expr(
&mut self,
- column: &Column,
- column_expr: &Expr,
+ column: &phys_expr::Column,
+ column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
- ) -> Result<Expr> {
+ ) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(column, column_expr, field, StatisticsType::Min,
"min")
}
/// rewrite col --> col_max
fn max_column_expr(
&mut self,
- column: &Column,
- column_expr: &Expr,
+ column: &phys_expr::Column,
+ column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
- ) -> Result<Expr> {
+ ) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(column, column_expr, field, StatisticsType::Max,
"max")
}
/// rewrite col --> col_null_count
fn null_count_column_expr(
&mut self,
- column: &Column,
- column_expr: &Expr,
+ column: &phys_expr::Column,
+ column_expr: &Arc<dyn PhysicalExpr>,
field: &Field,
- ) -> Result<Expr> {
+ ) -> Result<Arc<dyn PhysicalExpr>> {
self.stat_column_expr(
column,
column_expr,
@@ -355,8 +346,8 @@ impl RequiredStatColumns {
}
}
-impl From<Vec<(Column, StatisticsType, Field)>> for RequiredStatColumns {
- fn from(columns: Vec<(Column, StatisticsType, Field)>) -> Self {
+impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for
RequiredStatColumns {
+ fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
Self { columns }
}
}
@@ -394,14 +385,15 @@ fn build_statistics_record_batch<S: PruningStatistics>(
let mut arrays = Vec::<ArrayRef>::new();
// For each needed statistics column:
for (column, statistics_type, stat_field) in required_columns.iter() {
+ let column = Column::from_qualified_name(column.name());
let data_type = stat_field.data_type();
let num_containers = statistics.num_containers();
let array = match statistics_type {
- StatisticsType::Min => statistics.min_values(column),
- StatisticsType::Max => statistics.max_values(column),
- StatisticsType::NullCount => statistics.null_counts(column),
+ StatisticsType::Min => statistics.min_values(&column),
+ StatisticsType::Max => statistics.max_values(&column),
+ StatisticsType::NullCount => statistics.null_counts(&column),
};
let array = array.unwrap_or_else(|| new_null_array(data_type,
num_containers));
@@ -438,25 +430,25 @@ fn build_statistics_record_batch<S: PruningStatistics>(
}
struct PruningExpressionBuilder<'a> {
- column: Column,
- column_expr: Expr,
+ column: phys_expr::Column,
+ column_expr: Arc<dyn PhysicalExpr>,
op: Operator,
- scalar_expr: Expr,
+ scalar_expr: Arc<dyn PhysicalExpr>,
field: &'a Field,
required_columns: &'a mut RequiredStatColumns,
}
impl<'a> PruningExpressionBuilder<'a> {
fn try_new(
- left: &'a Expr,
- right: &'a Expr,
+ left: &'a Arc<dyn PhysicalExpr>,
+ right: &'a Arc<dyn PhysicalExpr>,
op: Operator,
schema: &'a Schema,
required_columns: &'a mut RequiredStatColumns,
) -> Result<Self> {
// find column name; input could be a more complicated expression
- let left_columns = left.to_columns()?;
- let right_columns = right.to_columns()?;
+ let left_columns = get_phys_expr_columns(left);
+ let right_columns = get_phys_expr_columns(right);
let (column_expr, scalar_expr, columns, correct_operator) =
match (left_columns.len(), right_columns.len()) {
(1, 0) => (left, right, left_columns, op),
@@ -478,7 +470,7 @@ impl<'a> PruningExpressionBuilder<'a> {
df_schema,
)?;
let column = columns.iter().next().unwrap().clone();
- let field = match schema.column_with_name(&column.flat_name()) {
+ let field = match schema.column_with_name(column.name()) {
Some((_, f)) => f,
_ => {
return Err(DataFusionError::Plan(
@@ -501,16 +493,16 @@ impl<'a> PruningExpressionBuilder<'a> {
self.op
}
- fn scalar_expr(&self) -> &Expr {
+ fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.scalar_expr
}
- fn min_column_expr(&mut self) -> Result<Expr> {
+ fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
self.required_columns
.min_column_expr(&self.column, &self.column_expr, self.field)
}
- fn max_column_expr(&mut self) -> Result<Expr> {
+ fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
self.required_columns
.max_column_expr(&self.column, &self.column_expr, self.field)
}
@@ -529,64 +521,83 @@ impl<'a> PruningExpressionBuilder<'a> {
///
/// More rewrite rules are still in progress.
fn rewrite_expr_to_prunable(
- column_expr: &Expr,
+ column_expr: &PhysicalExprRef,
op: Operator,
- scalar_expr: &Expr,
+ scalar_expr: &PhysicalExprRef,
schema: DFSchema,
-) -> Result<(Expr, Operator, Expr)> {
+) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
if !is_compare_op(op) {
return Err(DataFusionError::Plan(
"rewrite_expr_to_prunable only support compare
expression".to_string(),
));
}
- match column_expr {
+ let column_expr_any = column_expr.as_any();
+
+ if column_expr_any
+ .downcast_ref::<phys_expr::Column>()
+ .is_some()
+ {
// `col op lit()`
- Expr::Column(_) => Ok((column_expr.clone(), op, scalar_expr.clone())),
+ Ok((column_expr.clone(), op, scalar_expr.clone()))
+ } else if let Some(cast) =
column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
// `cast(col) op lit()`
- Expr::Cast(Cast { expr, data_type }) => {
- let from_type = expr.get_type(&schema)?;
- verify_support_type_for_prune(&from_type, data_type)?;
- let (left, op, right) =
- rewrite_expr_to_prunable(expr, op, scalar_expr, schema)?;
- Ok((cast(left, data_type.clone()), op, right))
- }
+ let arrow_schema: SchemaRef = schema.clone().into();
+ let from_type = cast.expr().data_type(&arrow_schema)?;
+ verify_support_type_for_prune(&from_type, cast.cast_type())?;
+ let (left, op, right) =
+ rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
+ let left = Arc::new(phys_expr::CastExpr::new(
+ left,
+ cast.cast_type().clone(),
+ DEFAULT_CAST_OPTIONS,
+ ));
+ Ok((left, op, right))
+ } else if let Some(try_cast) =
+ column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
+ {
// `try_cast(col) op lit()`
- Expr::TryCast(TryCast { expr, data_type }) => {
- let from_type = expr.get_type(&schema)?;
- verify_support_type_for_prune(&from_type, data_type)?;
- let (left, op, right) =
- rewrite_expr_to_prunable(expr, op, scalar_expr, schema)?;
- Ok((try_cast(left, data_type.clone()), op, right))
- }
+ let arrow_schema: SchemaRef = schema.clone().into();
+ let from_type = try_cast.expr().data_type(&arrow_schema)?;
+ verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
+ let (left, op, right) =
+ rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr,
schema)?;
+ let left = Arc::new(phys_expr::TryCastExpr::new(
+ left,
+ try_cast.cast_type().clone(),
+ ));
+ Ok((left, op, right))
+ } else if let Some(neg) =
column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
// `-col > lit()` --> `col < -lit()`
- Expr::Negative(c) => {
- let (left, op, right) = rewrite_expr_to_prunable(c, op,
scalar_expr, schema)?;
- Ok((left, reverse_operator(op)?, Expr::Negative(Box::new(right))))
- }
+ let (left, op, right) =
+ rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
+ let right = Arc::new(phys_expr::NegativeExpr::new(right));
+ Ok((left, reverse_operator(op)?, right))
+ } else if let Some(not) =
column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
// `!col = true` --> `col = !true`
- Expr::Not(c) => {
- if op != Operator::Eq && op != Operator::NotEq {
- return Err(DataFusionError::Plan(
- "Not with operator other than Eq / NotEq is not supported"
- .to_string(),
- ));
- }
- return match c.as_ref() {
- Expr::Column(_) => Ok((
- c.as_ref().clone(),
- reverse_operator(op)?,
- Expr::Not(Box::new(scalar_expr.clone())),
- )),
- _ => Err(DataFusionError::Plan(format!(
- "Not with complex expression {column_expr:?} is not
supported"
- ))),
- };
+ if op != Operator::Eq && op != Operator::NotEq {
+ return Err(DataFusionError::Plan(
+ "Not with operator other than Eq / NotEq is not
supported".to_string(),
+ ));
}
-
- _ => Err(DataFusionError::Plan(format!(
+ if not
+ .arg()
+ .as_any()
+ .downcast_ref::<phys_expr::Column>()
+ .is_some()
+ {
+ let left = not.arg().clone();
+ let right = Arc::new(phys_expr::NotExpr::new(scalar_expr.clone()));
+ Ok((left, reverse_operator(op)?, right))
+ } else {
+ Err(DataFusionError::Plan(format!(
+ "Not with complex expression {column_expr:?} is not supported"
+ )))
+ }
+ } else {
+ Err(DataFusionError::Plan(format!(
"column expression {column_expr:?} is not supported"
- ))),
+ )))
}
}
@@ -629,14 +640,32 @@ fn verify_support_type_for_prune(from_type: &DataType,
to_type: &DataType) -> Re
/// replaces a column with an old name with a new name in an expression
fn rewrite_column_expr(
- e: Expr,
- column_old: &Column,
- column_new: &Column,
-) -> Result<Expr> {
- rewrite_expr(e, |expr| match expr {
- Expr::Column(c) if c == *column_old =>
Ok(Expr::Column(column_new.clone())),
- _ => Ok(expr),
- })
+ e: Arc<dyn PhysicalExpr>,
+ column_old: &phys_expr::Column,
+ column_new: &phys_expr::Column,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ let mut rewriter = RewriteColumnExpr {
+ column_old,
+ column_new,
+ };
+ e.transform_using(&mut rewriter)
+}
+
+struct RewriteColumnExpr<'a> {
+ column_old: &'a phys_expr::Column,
+ column_new: &'a phys_expr::Column,
+}
+
+impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for RewriteColumnExpr<'a> {
+ fn mutate(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
+ if let Some(column) =
expr.as_any().downcast_ref::<phys_expr::Column>() {
+ if column == self.column_old {
+ return Ok(Arc::new(self.column_new.clone()));
+ }
+ }
+
+ Ok(expr)
+ }
}
fn reverse_operator(op: Operator) -> Result<Operator> {
@@ -652,15 +681,15 @@ fn reverse_operator(op: Operator) -> Result<Operator> {
/// if the column may contain values, and false if definitely does not
/// contain values
fn build_single_column_expr(
- column: &Column,
+ column: &phys_expr::Column,
schema: &Schema,
required_columns: &mut RequiredStatColumns,
is_not: bool, // if true, treat as !col
-) -> Option<Expr> {
- let field = schema.field_with_name(&column.name).ok()?;
+) -> Option<Arc<dyn PhysicalExpr>> {
+ let field = schema.field_with_name(column.name()).ok()?;
if matches!(field.data_type(), &DataType::Boolean) {
- let col_ref = Expr::Column(column.clone());
+ let col_ref = Arc::new(column.clone()) as _;
let min = required_columns
.min_column_expr(column, &col_ref, field)
@@ -675,11 +704,13 @@ fn build_single_column_expr(
if is_not {
// The only way we know a column couldn't match is if both the min
and max are true
// !(min && max)
- Some(!(min.and(max)))
+ Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
+ phys_expr::BinaryExpr::new(min, Operator::And, max),
+ ))))
} else {
// the only way we know a column couldn't match is if both the min
and max are false
// !(!min && !max) --> min || max
- Some(min.or(max))
+ Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
}
} else {
None
@@ -691,24 +722,27 @@ fn build_single_column_expr(
/// if the column may contain null, and false if definitely does not
/// contain null.
fn build_is_null_column_expr(
- expr: &Expr,
+ expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredStatColumns,
-) -> Option<Expr> {
- match expr {
- Expr::Column(ref col) => {
- let field = schema.field_with_name(&col.name).ok()?;
-
- let null_count_field = &Field::new(field.name(), DataType::UInt64,
true);
- required_columns
- .null_count_column_expr(col, expr, null_count_field)
- .map(|null_count_column_expr| {
- // IsNull(column) => null_count > 0
- null_count_column_expr.gt(lit::<u64>(0))
- })
- .ok()
- }
- _ => None,
+) -> Option<Arc<dyn PhysicalExpr>> {
+ if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
+ let field = schema.field_with_name(col.name()).ok()?;
+
+ let null_count_field = &Field::new(field.name(), DataType::UInt64,
true);
+ required_columns
+ .null_count_column_expr(col, expr, null_count_field)
+ .map(|null_count_column_expr| {
+ // IsNull(column) => null_count > 0
+ Arc::new(phys_expr::BinaryExpr::new(
+ null_count_column_expr,
+ Operator::Gt,
+
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
+ )) as _
+ })
+ .ok()
+ } else {
+ None
}
}
@@ -718,70 +752,95 @@ fn build_is_null_column_expr(
///
/// Returns the pruning predicate as an [`Expr`]
fn build_predicate_expression(
- expr: &Expr,
+ expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredStatColumns,
-) -> Expr {
+) -> Arc<dyn PhysicalExpr> {
// Returned for unsupported expressions. Such expressions are
// converted to TRUE.
- let unhandled = lit(true);
+ let unhandled =
Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))));
// predicate expression can only be a binary expression
- let (left, op, right) = match expr {
- Expr::BinaryExpr(BinaryExpr { left, op, right }) => (left, *op, right),
- Expr::IsNull(expr) => {
- return build_is_null_column_expr(expr, schema, required_columns)
- .unwrap_or(unhandled);
- }
- Expr::Column(col) => {
- return build_single_column_expr(col, schema, required_columns,
false)
+ let expr_any = expr.as_any();
+ if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
+ return build_is_null_column_expr(is_null.arg(), schema,
required_columns)
+ .unwrap_or(unhandled);
+ }
+ if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
+ return build_single_column_expr(col, schema, required_columns, false)
+ .unwrap_or(unhandled);
+ }
+ if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
+ // match !col (don't do so recursively)
+ if let Some(col) =
not.arg().as_any().downcast_ref::<phys_expr::Column>() {
+ return build_single_column_expr(col, schema, required_columns,
true)
.unwrap_or(unhandled);
+ } else {
+ return unhandled;
}
- // match !col (don't do so recursively)
- Expr::Not(input) => {
- if let Expr::Column(col) = input.as_ref() {
- return build_single_column_expr(col, schema, required_columns,
true)
- .unwrap_or(unhandled);
+ }
+ if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
+ if !in_list.list().is_empty() && in_list.list().len() < 20 {
+ let eq_op = if in_list.negated() {
+ Operator::NotEq
} else {
- return unhandled;
- }
- }
- Expr::InList {
- expr,
- list,
- negated,
- } if !list.is_empty() && list.len() < 20 => {
- let eq_fun = if *negated { Expr::not_eq } else { Expr::eq };
- let re_fun = if *negated { Expr::and } else { Expr::or };
- let change_expr = list
+ Operator::Eq
+ };
+ let re_op = if in_list.negated() {
+ Operator::And
+ } else {
+ Operator::Or
+ };
+ let change_expr = in_list
+ .list()
.iter()
- .map(|e| eq_fun(*expr.clone(), e.clone()))
- .reduce(re_fun)
+ .cloned()
+ .map(|e| {
+ Arc::new(phys_expr::BinaryExpr::new(
+ in_list.expr().clone(),
+ eq_op,
+ e.clone(),
+ )) as _
+ })
+ .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op,
b)) as _)
.unwrap();
return build_predicate_expression(&change_expr, schema,
required_columns);
+ } else {
+ return unhandled;
}
- _ => {
+ }
+
+ let (left, op, right) = {
+ if let Some(bin_expr) =
expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
+ (
+ bin_expr.left().clone(),
+ *bin_expr.op(),
+ bin_expr.right().clone(),
+ )
+ } else {
return unhandled;
}
};
if op == Operator::And || op == Operator::Or {
- let left_expr = build_predicate_expression(left, schema,
required_columns);
- let right_expr = build_predicate_expression(right, schema,
required_columns);
+ let left_expr = build_predicate_expression(&left, schema,
required_columns);
+ let right_expr = build_predicate_expression(&right, schema,
required_columns);
// simplify boolean expression if applicable
let expr = match (&left_expr, op, &right_expr) {
- (left, Operator::And, _) if *left == unhandled => right_expr,
- (_, Operator::And, right) if *right == unhandled => left_expr,
- (left, Operator::Or, right) if *left == unhandled || *right ==
unhandled => {
+ (left, Operator::And, _) if is_always_true(left) => right_expr,
+ (_, Operator::And, right) if is_always_true(right) => left_expr,
+ (left, Operator::Or, right)
+ if is_always_true(left) || is_always_true(right) =>
+ {
unhandled
}
- _ => binary_expr(left_expr, op, right_expr),
+ _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op,
right_expr)),
};
return expr;
}
let expr_builder =
- PruningExpressionBuilder::try_new(left, right, op, schema,
required_columns);
+ PruningExpressionBuilder::try_new(&left, &right, op, schema,
required_columns);
let mut expr_builder = match expr_builder {
Ok(builder) => builder,
// allow partial failure in predicate expression generation
@@ -794,8 +853,10 @@ fn build_predicate_expression(
build_statistics_expr(&mut expr_builder).unwrap_or(unhandled)
}
-fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) ->
Result<Expr> {
- let statistics_expr =
+fn build_statistics_expr(
+ expr_builder: &mut PruningExpressionBuilder,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ let statistics_expr: Arc<dyn PhysicalExpr> =
match expr_builder.op() {
Operator::NotEq => {
// column != literal => (min, max) = literal =>
@@ -803,42 +864,70 @@ fn build_statistics_expr(expr_builder: &mut
PruningExpressionBuilder) -> Result<
// min != literal || literal != max
let min_column_expr = expr_builder.min_column_expr()?;
let max_column_expr = expr_builder.max_column_expr()?;
- min_column_expr
- .not_eq(expr_builder.scalar_expr().clone())
-
.or(expr_builder.scalar_expr().clone().not_eq(max_column_expr))
+ Arc::new(phys_expr::BinaryExpr::new(
+ Arc::new(phys_expr::BinaryExpr::new(
+ min_column_expr,
+ Operator::NotEq,
+ expr_builder.scalar_expr().clone(),
+ )),
+ Operator::Or,
+ Arc::new(phys_expr::BinaryExpr::new(
+ expr_builder.scalar_expr().clone(),
+ Operator::NotEq,
+ max_column_expr,
+ )),
+ ))
}
Operator::Eq => {
// column = literal => (min, max) = literal => min <= literal
&& literal <= max
// (column / 2) = 4 => (column_min / 2) <= 4 && 4 <=
(column_max / 2)
let min_column_expr = expr_builder.min_column_expr()?;
let max_column_expr = expr_builder.max_column_expr()?;
- min_column_expr
- .lt_eq(expr_builder.scalar_expr().clone())
-
.and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr))
+ Arc::new(phys_expr::BinaryExpr::new(
+ Arc::new(phys_expr::BinaryExpr::new(
+ min_column_expr,
+ Operator::LtEq,
+ expr_builder.scalar_expr().clone(),
+ )),
+ Operator::And,
+ Arc::new(phys_expr::BinaryExpr::new(
+ expr_builder.scalar_expr().clone(),
+ Operator::LtEq,
+ max_column_expr,
+ )),
+ ))
}
Operator::Gt => {
// column > literal => (min, max) > literal => max > literal
- expr_builder
- .max_column_expr()?
- .gt(expr_builder.scalar_expr().clone())
+ Arc::new(phys_expr::BinaryExpr::new(
+ expr_builder.max_column_expr()?,
+ Operator::Gt,
+ expr_builder.scalar_expr().clone(),
+ ))
}
Operator::GtEq => {
// column >= literal => (min, max) >= literal => max >= literal
- expr_builder
- .max_column_expr()?
- .gt_eq(expr_builder.scalar_expr().clone())
+ Arc::new(phys_expr::BinaryExpr::new(
+ expr_builder.max_column_expr()?,
+ Operator::GtEq,
+ expr_builder.scalar_expr().clone(),
+ ))
}
Operator::Lt => {
// column < literal => (min, max) < literal => min < literal
- expr_builder
- .min_column_expr()?
- .lt(expr_builder.scalar_expr().clone())
+ Arc::new(phys_expr::BinaryExpr::new(
+ expr_builder.min_column_expr()?,
+ Operator::Lt,
+ expr_builder.scalar_expr().clone(),
+ ))
}
Operator::LtEq => {
// column <= literal => (min, max) <= literal => min <= literal
- expr_builder
- .min_column_expr()?
- .lt_eq(expr_builder.scalar_expr().clone())
+ Arc::new(phys_expr::BinaryExpr::new(
+ expr_builder.min_column_expr()?,
+ Operator::LtEq,
+ expr_builder.scalar_expr().clone(),
+ ))
}
// other expressions are not supported
_ => return Err(DataFusionError::Plan(
@@ -867,8 +956,10 @@ mod tests {
array::{BinaryArray, Int32Array, Int64Array, StringArray},
datatypes::{DataType, TimeUnit},
};
- use datafusion_common::ScalarValue;
- use datafusion_expr::{cast, is_null};
+ use datafusion_common::{ScalarValue, ToDFSchema};
+ use datafusion_expr::{cast, is_null, try_cast, Expr};
+ use datafusion_physical_expr::create_physical_expr;
+ use datafusion_physical_expr::execution_props::ExecutionProps;
use std::collections::HashMap;
#[derive(Debug)]
@@ -1093,25 +1184,25 @@ mod tests {
let required_columns = RequiredStatColumns::from(vec![
// min of original column s1, named s1_min
(
- "s1".into(),
+ phys_expr::Column::new("s1", 1),
StatisticsType::Min,
Field::new("s1_min", DataType::Int32, true),
),
// max of original column s2, named s2_max
(
- "s2".into(),
+ phys_expr::Column::new("s2", 2),
StatisticsType::Max,
Field::new("s2_max", DataType::Int32, true),
),
// max of original column s3, named s3_max
(
- "s3".into(),
+ phys_expr::Column::new("s3", 3),
StatisticsType::Max,
Field::new("s3_max", DataType::Utf8, true),
),
// min of original column s3, named s3_min
(
- "s3".into(),
+ phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new("s3_min", DataType::Utf8, true),
),
@@ -1163,7 +1254,7 @@ mod tests {
// Request a record batch with of s1_min as a timestamp
let required_columns = RequiredStatColumns::from(vec![(
- "s3".into(),
+ phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new(
"s1_min",
@@ -1213,7 +1304,7 @@ mod tests {
// Request a record batch with of s1_min as a timestamp
let required_columns = RequiredStatColumns::from(vec![(
- "s3".into(),
+ phys_expr::Column::new("s3", 3),
StatisticsType::Min,
Field::new("s1_min", DataType::Utf8, true),
)]);
@@ -1242,7 +1333,7 @@ mod tests {
fn test_build_statistics_inconsistent_length() {
// return an inconsistent length to the actual statistics arrays
let required_columns = RequiredStatColumns::from(vec![(
- "s1".into(),
+ phys_expr::Column::new("s1", 3),
StatisticsType::Min,
Field::new("s1_min", DataType::Int64, true),
)]);
@@ -1268,19 +1359,25 @@ mod tests {
#[test]
fn row_group_predicate_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
- let expected_expr = "c1_min <= Int32(1) AND Int32(1) <= c1_max";
+ let expected_expr = "c1_min@0 <= 1 AND 1 <= c1_max@1";
// test column on the left
let expr = col("c1").eq(lit(1));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).eq(col("c1"));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1288,19 +1385,25 @@ mod tests {
#[test]
fn row_group_predicate_not_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
- let expected_expr = "c1_min != Int32(1) OR Int32(1) != c1_max";
+ let expected_expr = "c1_min@0 != 1 OR 1 != c1_max@1";
// test column on the left
let expr = col("c1").not_eq(lit(1));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).not_eq(col("c1"));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1308,19 +1411,25 @@ mod tests {
#[test]
fn row_group_predicate_gt() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
- let expected_expr = "c1_max > Int32(1)";
+ let expected_expr = "c1_max@0 > 1";
// test column on the left
let expr = col("c1").gt(lit(1));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).lt(col("c1"));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1328,18 +1437,24 @@ mod tests {
#[test]
fn row_group_predicate_gt_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
- let expected_expr = "c1_max >= Int32(1)";
+ let expected_expr = "c1_max@0 >= 1";
// test column on the left
let expr = col("c1").gt_eq(lit(1));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).lt_eq(col("c1"));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1347,19 +1462,25 @@ mod tests {
#[test]
fn row_group_predicate_lt() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
- let expected_expr = "c1_min < Int32(1)";
+ let expected_expr = "c1_min@0 < 1";
// test column on the left
let expr = col("c1").lt(lit(1));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).gt(col("c1"));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1367,18 +1488,24 @@ mod tests {
#[test]
fn row_group_predicate_lt_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
- let expected_expr = "c1_min <= Int32(1)";
+ let expected_expr = "c1_min@0 <= 1";
// test column on the left
let expr = col("c1").lt_eq(lit(1));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(1).gt_eq(col("c1"));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1392,10 +1519,13 @@ mod tests {
]);
// test AND operator joining supported c1 < 1 expression and
unsupported c2 > c3 expression
let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
- let expected_expr = "c1_min < Int32(1)";
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let expected_expr = "c1_min@0 < 1";
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1406,12 +1536,17 @@ mod tests {
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]);
- // test OR operator joining supported c1 < 1 expression and
unsupported c2 % 2 expression
- let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2)));
- let expected_expr = "Boolean(true)";
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ // test OR operator joining supported c1 < 1 expression and
unsupported c2 % 2 = 0 expression
+ let expr = col("c1")
+ .lt(lit(1))
+ .or(col("c2").modulus(lit(2)).eq(lit(0)));
+ let expected_expr = "true";
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1419,12 +1554,15 @@ mod tests {
#[test]
fn row_group_predicate_not() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
- let expected_expr = "Boolean(true)";
+ let expected_expr = "true";
let expr = col("c1").not();
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1432,12 +1570,15 @@ mod tests {
#[test]
fn row_group_predicate_not_bool() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Boolean,
false)]);
- let expected_expr = "NOT c1_min AND c1_max";
+ let expected_expr = "NOT c1_min@0 AND c1_max@1";
let expr = col("c1").not();
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1445,12 +1586,15 @@ mod tests {
#[test]
fn row_group_predicate_bool() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Boolean,
false)]);
- let expected_expr = "c1_min OR c1_max";
+ let expected_expr = "c1_min@0 OR c1_max@1";
let expr = col("c1");
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1458,14 +1602,17 @@ mod tests {
#[test]
fn row_group_predicate_lt_bool() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Boolean,
false)]);
- let expected_expr = "c1_min < Boolean(true)";
+ let expected_expr = "c1_min@0 < true";
// DF doesn't support arithmetic on boolean columns so
// this predicate will error when evaluated
let expr = col("c1").lt(lit(true));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1481,26 +1628,38 @@ mod tests {
let expr = col("c1")
.lt(lit(1))
.and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
- let expected_expr = "c1_min < Int32(1) AND (c2_min <= Int32(2) AND
Int32(2) <= c2_max OR c2_min <= Int32(3) AND Int32(3) <= c2_max)";
+ let expected_expr = "c1_min@0 < 1 AND (c2_min@1 <= 2 AND 2 <= c2_max@2
OR c2_min@1 <= 3 AND 3 <= c2_max@2)";
let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut required_columns);
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ test_build_predicate_expression(&expr, &schema, &mut
required_columns);
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// c1 < 1 should add c1_min
let c1_min_field = Field::new("c1_min", DataType::Int32, false);
assert_eq!(
required_columns.columns[0],
- ("c1".into(), StatisticsType::Min, c1_min_field)
+ (
+ phys_expr::Column::new("c1", 0),
+ StatisticsType::Min,
+ c1_min_field
+ )
);
// c2 = 2 should add c2_min and c2_max
let c2_min_field = Field::new("c2_min", DataType::Int32, false);
assert_eq!(
required_columns.columns[1],
- ("c2".into(), StatisticsType::Min, c2_min_field)
+ (
+ phys_expr::Column::new("c2", 1),
+ StatisticsType::Min,
+ c2_min_field
+ )
);
let c2_max_field = Field::new("c2_max", DataType::Int32, false);
assert_eq!(
required_columns.columns[2],
- ("c2".into(), StatisticsType::Max, c2_max_field)
+ (
+ phys_expr::Column::new("c2", 1),
+ StatisticsType::Max,
+ c2_max_field
+ )
);
// c2 = 3 shouldn't add any new statistics fields
assert_eq!(required_columns.columns.len(), 3);
@@ -1520,10 +1679,13 @@ mod tests {
list: vec![lit(1), lit(2), lit(3)],
negated: false,
};
- let expected_expr = "c1_min <= Int32(1) AND Int32(1) <= c1_max OR
c1_min <= Int32(2) AND Int32(2) <= c1_max OR c1_min <= Int32(3) AND Int32(3) <=
c1_max";
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let expected_expr = "c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_min@0 <= 2
AND 2 <= c1_max@1 OR c1_min@0 <= 3 AND 3 <= c1_max@1";
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1540,10 +1702,13 @@ mod tests {
list: vec![],
negated: false,
};
- let expected_expr = "Boolean(true)";
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let expected_expr = "true";
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1560,12 +1725,15 @@ mod tests {
list: vec![lit(1), lit(2), lit(3)],
negated: true,
};
- let expected_expr = "(c1_min != Int32(1) OR Int32(1) != c1_max) \
- AND (c1_min != Int32(2) OR Int32(2) != c1_max) \
- AND (c1_min != Int32(3) OR Int32(3) != c1_max)";
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let expected_expr = "(c1_min@0 != 1 OR 1 != c1_max@1) \
+ AND (c1_min@0 != 2 OR 2 != c1_max@1) \
+ AND (c1_min@0 != 3 OR 3 != c1_max@1)";
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1574,35 +1742,47 @@ mod tests {
fn row_group_predicate_cast() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
let expected_expr =
- "CAST(c1_min AS Int64) <= Int64(1) AND Int64(1) <= CAST(c1_max AS
Int64)";
+ "CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
// test column on the left
let expr = cast(col("c1"),
DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"),
DataType::Int64));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
- let expected_expr = "TRY_CAST(c1_max AS Int64) > Int64(1)";
+ let expected_expr = "TRY_CAST(c1_max@0 AS Int64) > 1";
// test column on the left
let expr =
try_cast(col("c1"),
DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
// test column on the right
let expr =
lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"),
DataType::Int64));
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1620,10 +1800,13 @@ mod tests {
],
negated: false,
};
- let expected_expr = "CAST(c1_min AS Int64) <= Int64(1) AND Int64(1) <=
CAST(c1_max AS Int64) OR CAST(c1_min AS Int64) <= Int64(2) AND Int64(2) <=
CAST(c1_max AS Int64) OR CAST(c1_min AS Int64) <= Int64(3) AND Int64(3) <=
CAST(c1_max AS Int64)";
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ let expected_expr = "CAST(c1_min@0 AS Int64) <= 1 AND 1 <=
CAST(c1_max@1 AS Int64) OR CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1
AS Int64) OR CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
let expr = Expr::InList {
expr: Box::new(cast(col("c1"), DataType::Int64)),
@@ -1635,12 +1818,15 @@ mod tests {
negated: true,
};
let expected_expr =
- "(CAST(c1_min AS Int64) != Int64(1) OR Int64(1) != CAST(c1_max AS
Int64)) \
- AND (CAST(c1_min AS Int64) != Int64(2) OR Int64(2) != CAST(c1_max AS
Int64)) \
- AND (CAST(c1_min AS Int64) != Int64(3) OR Int64(3) != CAST(c1_max AS
Int64))";
- let predicate_expr =
- build_predicate_expression(&expr, &schema, &mut
RequiredStatColumns::new());
- assert_eq!(format!("{predicate_expr:?}"), expected_expr);
+ "(CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) \
+ AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) \
+ AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
+ let predicate_expr = test_build_predicate_expression(
+ &expr,
+ &schema,
+ &mut RequiredStatColumns::new(),
+ );
+ assert_eq!(predicate_expr.to_string(), expected_expr);
Ok(())
}
@@ -1655,6 +1841,7 @@ mod tests {
)]));
// s1 > 5
let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+ let expr = logical2physical(&expr, &schema);
// If the data is written by spark, the physical data type is INT32 in
the parquet
// So we use the INT32 type of statistic.
let statistics = TestStatistics::new().with(
@@ -1672,6 +1859,7 @@ mod tests {
// with cast column to other type
let expr = cast(col("s1"), DataType::Decimal128(14, 3))
.gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3)));
+ let expr = logical2physical(&expr, &schema);
let statistics = TestStatistics::new().with(
"s1",
ContainerStats::new_i32(
@@ -1687,6 +1875,7 @@ mod tests {
// with try cast column to other type
let expr = try_cast(col("s1"), DataType::Decimal128(14, 3))
.gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3)));
+ let expr = logical2physical(&expr, &schema);
let statistics = TestStatistics::new().with(
"s1",
ContainerStats::new_i32(
@@ -1707,6 +1896,7 @@ mod tests {
)]));
// s1 > 5
let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18,
2)));
+ let expr = logical2physical(&expr, &schema);
// If the data is written by spark, the physical data type is INT64 in
the parquet
// So we use the INT32 type of statistic.
let statistics = TestStatistics::new().with(
@@ -1729,6 +1919,7 @@ mod tests {
)]));
// s1 > 5
let expr = col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23,
2)));
+ let expr = logical2physical(&expr, &schema);
let statistics = TestStatistics::new().with(
"s1",
ContainerStats::new_decimal128(
@@ -1753,6 +1944,7 @@ mod tests {
// Prune using s2 > 5
let expr = col("s2").gt(lit(5));
+ let expr = logical2physical(&expr, &schema);
let statistics = TestStatistics::new().with(
"s2",
@@ -1774,6 +1966,7 @@ mod tests {
// filter with cast
let expr = cast(col("s2"),
DataType::Int64).gt(lit(ScalarValue::Int64(Some(5))));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
let expected = vec![false, true, true, true];
@@ -1786,6 +1979,7 @@ mod tests {
// Prune using s2 != 'M'
let expr = col("s1").not_eq(lit("M"));
+ let expr = logical2physical(&expr, &schema);
let statistics = TestStatistics::new().with(
"s1",
@@ -1840,12 +2034,34 @@ mod tests {
(schema, statistics, expected_true, expected_false)
}
+ #[test]
+ fn prune_bool_const_expr() {
+ let (schema, statistics, _, _) = bool_setup();
+
+ // true
+ let expr = lit(true);
+ let expr = logical2physical(&expr, &schema);
+ let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
+ let result = p.prune(&statistics).unwrap();
+ assert_eq!(result, vec![true, true, true, true, true]);
+
+ // false
+ // constant literals that do NOT refer to any columns are currently
not evaluated at all, hence the result is
+ // "all true"
+ let expr = lit(false);
+ let expr = logical2physical(&expr, &schema);
+ let p = PruningPredicate::try_new(expr, schema).unwrap();
+ let result = p.prune(&statistics).unwrap();
+ assert_eq!(result, vec![true, true, true, true, true]);
+ }
+
#[test]
fn prune_bool_column() {
let (schema, statistics, expected_true, _) = bool_setup();
// b1
let expr = col("b1");
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_true);
@@ -1857,6 +2073,7 @@ mod tests {
// !b1
let expr = col("b1").not();
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_false);
@@ -1868,6 +2085,7 @@ mod tests {
// b1 = true
let expr = col("b1").eq(lit(true));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_true);
@@ -1879,6 +2097,7 @@ mod tests {
// !b1 = true
let expr = col("b1").not().eq(lit(true));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_false);
@@ -1920,12 +2139,14 @@ mod tests {
// i > 0
let expr = col("i").gt(lit(0));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
// -i < 0
let expr = Expr::Negative(Box::new(col("i"))).lt(lit(0));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -1945,12 +2166,14 @@ mod tests {
// i <= 0
let expr = col("i").lt_eq(lit(0));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
// -i >= 0
let expr = Expr::Negative(Box::new(col("i"))).gt_eq(lit(0));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -1970,26 +2193,30 @@ mod tests {
// cast(i as utf8) <= 0
let expr = cast(col("i"), DataType::Utf8).lt_eq(lit("0"));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
// try_cast(i as utf8) <= 0
let expr = try_cast(col("i"), DataType::Utf8).lt_eq(lit("0"));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
// cast(-i as utf8) >= 0
let expr =
- Expr::Negative(Box::new(cast(col("i"),
DataType::Utf8))).gt_eq(lit("0"));
+ cast(Expr::Negative(Box::new(col("i"))),
DataType::Utf8).gt_eq(lit("0"));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
// try_cast(-i as utf8) >= 0
let expr =
- Expr::Negative(Box::new(try_cast(col("i"),
DataType::Utf8))).gt_eq(lit("0"));
+ try_cast(Expr::Negative(Box::new(col("i"))),
DataType::Utf8).gt_eq(lit("0"));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2009,6 +2236,7 @@ mod tests {
// i = 0
let expr = col("i").eq(lit(0));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2027,11 +2255,13 @@ mod tests {
let expected_ret = vec![true, false, false, true, false];
let expr = cast(col("i"), DataType::Int64).eq(lit(0i64));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
let expr = try_cast(col("i"), DataType::Int64).eq(lit(0i64));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2053,6 +2283,7 @@ mod tests {
let expected_ret = vec![true, true, true, true, true];
let expr = cast(col("i"), DataType::Utf8).eq(lit("0"));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2072,12 +2303,14 @@ mod tests {
// i > -1
let expr = col("i").gt(lit(-1));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
// -i < 1
let expr = Expr::Negative(Box::new(col("i"))).lt(lit(1));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2093,6 +2326,7 @@ mod tests {
// i IS NULL, no null statistics
let expr = col("i").is_null();
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2113,6 +2347,7 @@ mod tests {
// i IS NULL, with actual null statistcs
let expr = col("i").is_null();
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2126,12 +2361,14 @@ mod tests {
// i > int64(0)
let expr = col("i").gt(cast(lit(ScalarValue::Int64(Some(0))),
DataType::Int32));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
// cast(i as int64) > int64(0)
let expr = cast(col("i"),
DataType::Int64).gt(lit(ScalarValue::Int64(Some(0))));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2139,6 +2376,7 @@ mod tests {
// try_cast(i as int64) > int64(0)
let expr =
try_cast(col("i"),
DataType::Int64).gt(lit(ScalarValue::Int64(Some(0))));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2146,6 +2384,7 @@ mod tests {
// `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
let expr = Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
.lt(lit(ScalarValue::Int64(Some(0))));
+ let expr = logical2physical(&expr, &schema);
let p = PruningPredicate::try_new(expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, expected_ret);
@@ -2154,10 +2393,13 @@ mod tests {
#[test]
fn test_rewrite_expr_to_prunable() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
- let df_schema = DFSchema::try_from(schema).unwrap();
+ let df_schema = DFSchema::try_from(schema.clone()).unwrap();
+
// column op lit
let left_input = col("a");
+ let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int32(Some(12)));
+ let right_input = logical2physical(&right_input, &schema);
let (result_left, _, result_right) = rewrite_expr_to_prunable(
&left_input,
Operator::Eq,
@@ -2165,11 +2407,14 @@ mod tests {
df_schema.clone(),
)
.unwrap();
- assert_eq!(result_left, left_input);
- assert_eq!(result_right, right_input);
+ assert_eq!(result_left.to_string(), left_input.to_string());
+ assert_eq!(result_right.to_string(), right_input.to_string());
+
// cast op lit
let left_input = cast(col("a"), DataType::Decimal128(20, 3));
+ let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
+ let right_input = logical2physical(&right_input, &schema);
let (result_left, _, result_right) = rewrite_expr_to_prunable(
&left_input,
Operator::Gt,
@@ -2177,16 +2422,20 @@ mod tests {
df_schema.clone(),
)
.unwrap();
- assert_eq!(result_left, left_input);
- assert_eq!(result_right, right_input);
+ assert_eq!(result_left.to_string(), left_input.to_string());
+ assert_eq!(result_right.to_string(), right_input.to_string());
+
// try_cast op lit
let left_input = try_cast(col("a"), DataType::Int64);
+ let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int64(Some(12)));
+ let right_input = logical2physical(&right_input, &schema);
let (result_left, _, result_right) =
rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input,
df_schema)
.unwrap();
- assert_eq!(result_left, left_input);
- assert_eq!(result_right, right_input);
+ assert_eq!(result_left.to_string(), left_input.to_string());
+ assert_eq!(result_right.to_string(), right_input.to_string());
+
// TODO: add test for other case and op
}
@@ -2195,9 +2444,11 @@ mod tests {
// cast string value to numeric value
// this cast is not supported
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
- let df_schema = DFSchema::try_from(schema).unwrap();
+ let df_schema = DFSchema::try_from(schema.clone()).unwrap();
let left_input = cast(col("a"), DataType::Int64);
+ let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int64(Some(12)));
+ let right_input = logical2physical(&right_input, &schema);
let result = rewrite_expr_to_prunable(
&left_input,
Operator::Gt,
@@ -2205,12 +2456,30 @@ mod tests {
df_schema.clone(),
);
assert!(result.is_err());
+
// other expr
let left_input = is_null(col("a"));
+ let left_input = logical2physical(&left_input, &schema);
let right_input = lit(ScalarValue::Int64(Some(12)));
+ let right_input = logical2physical(&right_input, &schema);
let result =
rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input,
df_schema);
assert!(result.is_err());
// TODO: add other negative test for other case and op
}
+
+ fn test_build_predicate_expression(
+ expr: &Expr,
+ schema: &Schema,
+ required_columns: &mut RequiredStatColumns,
+ ) -> Arc<dyn PhysicalExpr> {
+ let expr = logical2physical(expr, schema);
+ build_predicate_expression(&expr, schema, required_columns)
+ }
+
+ fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
+ let df_schema = schema.clone().to_dfschema().unwrap();
+ let execution_props = ExecutionProps::new();
+ create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
+ }
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index e2d8cc94d..a12672756 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -18,6 +18,7 @@
//! Execution plan for reading Parquet files
use arrow::datatypes::{DataType, SchemaRef};
+use datafusion_physical_expr::PhysicalExpr;
use fmt::Debug;
use std::any::Any;
use std::cmp::min;
@@ -47,7 +48,6 @@ use crate::{
};
use arrow::error::ArrowError;
use bytes::Bytes;
-use datafusion_expr::Expr;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
@@ -97,7 +97,7 @@ pub struct ParquetExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
- predicate: Option<Arc<Expr>>,
+ predicate: Option<Arc<dyn PhysicalExpr>>,
/// Optional predicate for pruning row groups
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages
@@ -112,7 +112,7 @@ impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and
schema.
pub fn new(
base_config: FileScanConfig,
- predicate: Option<Expr>,
+ predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate:
{:?}, limit: {:?}",
@@ -151,9 +151,6 @@ impl ParquetExec {
}
});
- // Save original predicate
- let predicate = predicate.map(Arc::new);
-
let (projected_schema, projected_statistics) = base_config.project();
Self {
@@ -462,7 +459,7 @@ struct ParquetOpener {
projection: Arc<[usize]>,
batch_size: usize,
limit: Option<usize>,
- predicate: Option<Arc<Expr>>,
+ predicate: Option<Arc<dyn PhysicalExpr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
table_schema: SchemaRef,
@@ -511,6 +508,7 @@ impl FileOpener for ParquetOpener {
.await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;
+ // let predicate = predicate.map(|p| reassign_predicate_columns(p,
builder.schema(), true)).transpose()?;
let mask = ProjectionMask::roots(
builder.parquet_schema(),
@@ -520,7 +518,7 @@ impl FileOpener for ParquetOpener {
// Filter pushdown: evaluate predicates during scan
if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
- predicate.as_ref(),
+ &predicate,
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
@@ -823,9 +821,11 @@ mod tests {
datatypes::{DataType, Field},
};
use chrono::{TimeZone, Utc};
- use datafusion_common::assert_contains;
use datafusion_common::ScalarValue;
- use datafusion_expr::{col, lit, when};
+ use datafusion_common::{assert_contains, ToDFSchema};
+ use datafusion_expr::{col, lit, when, Expr};
+ use datafusion_physical_expr::create_physical_expr;
+ use datafusion_physical_expr::execution_props::ExecutionProps;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
@@ -917,6 +917,9 @@ mod tests {
let (meta, _files) = store_parquet(batches,
multi_page).await.unwrap();
let file_groups = meta.into_iter().map(Into::into).collect();
+ // set up predicate (this is normally done by a layer higher up)
+ let predicate = predicate.map(|p| logical2physical(&p,
&file_schema));
+
// prepare the scan
let mut parquet_exec = ParquetExec::new(
FileScanConfig {
@@ -1863,7 +1866,7 @@ mod tests {
"pruning_predicate=c1_min@0 != bar OR bar != c1_max@1"
);
- assert_contains!(&display, r#"predicate=c1 != Utf8("bar")"#);
+ assert_contains!(&display, r#"predicate=c1@0 != bar"#);
assert_contains!(&display, "projection=[c1]");
}
@@ -1903,7 +1906,8 @@ mod tests {
// but does still has a pushdown down predicate
let predicate = rt.parquet_exec.predicate.as_ref();
- assert_eq!(predicate.unwrap().as_ref(), &filter);
+ let filter_phys = logical2physical(&filter,
rt.parquet_exec.schema().as_ref());
+ assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
}
#[tokio::test]
@@ -2256,4 +2260,10 @@ mod tests {
Ok(())
}
+
+ fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
+ let df_schema = schema.clone().to_dfschema().unwrap();
+ let execution_props = ExecutionProps::new();
+ create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
+ }
}
diff --git
a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 585f0c886..0853caabe 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -23,9 +23,9 @@ use arrow::array::{
};
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
-use datafusion_common::{Column, DataFusionError, Result};
-use datafusion_expr::Expr;
-use datafusion_optimizer::utils::split_conjunction;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
use parquet::schema::types::ColumnDescriptor;
use parquet::{
@@ -107,7 +107,7 @@ pub(crate) struct PagePruningPredicate {
impl PagePruningPredicate {
/// Create a new [`PagePruningPredicate`]
- pub fn try_new(expr: &Expr, schema: SchemaRef) -> Result<Self> {
+ pub fn try_new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) ->
Result<Self> {
let predicates = split_conjunction(expr)
.into_iter()
.filter_map(|predicate| {
@@ -253,7 +253,8 @@ fn find_column_index(
if let Some(found_required_column) = found_required_column.as_ref() {
// make sure it is the same name we have seen previously
assert_eq!(
- column.name, found_required_column.name,
+ column.name(),
+ found_required_column.name(),
"Unexpected multi column predicate"
);
} else {
@@ -272,11 +273,11 @@ fn find_column_index(
.columns()
.iter()
.enumerate()
- .find(|(_idx, c)| c.column_descr().name() == column.name)
+ .find(|(_idx, c)| c.column_descr().name() == column.name())
.map(|(idx, _c)| idx);
if col_idx.is_none() {
- trace!("Can not find column {} in row group meta", column.name);
+ trace!("Can not find column {} in row group meta", column.name());
}
col_idx
@@ -506,11 +507,11 @@ macro_rules! get_min_max_values_for_page_index {
}
impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
- fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
+ fn min_values(&self, _column: &datafusion_common::Column) ->
Option<ArrayRef> {
get_min_max_values_for_page_index!(self, min)
}
- fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
+ fn max_values(&self, _column: &datafusion_common::Column) ->
Option<ArrayRef> {
get_min_max_values_for_page_index!(self, max)
}
@@ -518,7 +519,7 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
self.col_offset_indexes.len()
}
- fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+ fn null_counts(&self, _column: &datafusion_common::Column) ->
Option<ArrayRef> {
match self.col_page_indexes {
Index::NONE => None,
Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter(
diff --git
a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index 92c0ddc87..e1feafec1 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -20,14 +20,15 @@ use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
-use datafusion_common::{Column, DataFusionError, Result, ScalarValue,
ToDFSchema};
-use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter,
RewriteRecursion};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_physical_expr::expressions::{Column, Literal};
+use datafusion_physical_expr::rewrite::{
+ RewriteRecursion, TreeNodeRewritable, TreeNodeRewriter,
+};
+use datafusion_physical_expr::utils::reassign_predicate_columns;
use std::collections::BTreeSet;
-use datafusion_expr::Expr;
-use datafusion_optimizer::utils::split_conjunction;
-use datafusion_physical_expr::execution_props::ExecutionProps;
-use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
+use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
@@ -87,13 +88,8 @@ impl DatafusionArrowPredicate {
rows_filtered: metrics::Count,
time: metrics::Time,
) -> Result<Self> {
- let props = ExecutionProps::default();
-
- let schema = schema.project(&candidate.projection)?;
- let df_schema = schema.clone().to_dfschema()?;
-
- let physical_expr =
- create_physical_expr(&candidate.expr, &df_schema, &schema,
&props)?;
+ let schema = Arc::new(schema.project(&candidate.projection)?);
+ let physical_expr = reassign_predicate_columns(candidate.expr,
&schema, true)?;
// ArrowPredicate::evaluate is passed columns in the order they appear
in the file
// If the predicate has multiple columns, we therefore must project
the columns based
@@ -153,7 +149,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
/// expression as well as data to estimate the cost of evaluating
/// the resulting expression.
pub(crate) struct FilterCandidate {
- expr: Expr,
+ expr: Arc<dyn PhysicalExpr>,
required_bytes: usize,
can_use_index: bool,
projection: Vec<usize>,
@@ -167,7 +163,7 @@ pub(crate) struct FilterCandidate {
/// and any given file may or may not contain all columns in the merged
schema. If a particular column is not present
/// we replace the column expression with a literal expression that
produces a null value.
struct FilterCandidateBuilder<'a> {
- expr: Expr,
+ expr: Arc<dyn PhysicalExpr>,
file_schema: &'a Schema,
table_schema: &'a Schema,
required_column_indices: BTreeSet<usize>,
@@ -176,7 +172,11 @@ struct FilterCandidateBuilder<'a> {
}
impl<'a> FilterCandidateBuilder<'a> {
- pub fn new(expr: Expr, file_schema: &'a Schema, table_schema: &'a Schema)
-> Self {
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ file_schema: &'a Schema,
+ table_schema: &'a Schema,
+ ) -> Self {
Self {
expr,
file_schema,
@@ -192,7 +192,7 @@ impl<'a> FilterCandidateBuilder<'a> {
metadata: &ParquetMetaData,
) -> Result<Option<FilterCandidate>> {
let expr = self.expr.clone();
- let expr = expr.rewrite(&mut self)?;
+ let expr = expr.transform_using(&mut self)?;
if self.non_primitive_columns || self.projected_columns {
Ok(None)
@@ -211,16 +211,16 @@ impl<'a> FilterCandidateBuilder<'a> {
}
}
-impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
- fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
- if let Expr::Column(column) = expr {
- if let Ok(idx) = self.file_schema.index_of(&column.name) {
+impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for
FilterCandidateBuilder<'a> {
+ fn pre_visit(&mut self, node: &Arc<dyn PhysicalExpr>) ->
Result<RewriteRecursion> {
+ if let Some(column) = node.as_any().downcast_ref::<Column>() {
+ if let Ok(idx) = self.file_schema.index_of(column.name()) {
self.required_column_indices.insert(idx);
if
DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
}
- } else if self.table_schema.index_of(&column.name).is_err() {
+ } else if self.table_schema.index_of(column.name()).is_err() {
// If the column does not exist in the (un-projected) table
schema then
// it must be a projected column.
self.projected_columns = true;
@@ -229,15 +229,15 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
Ok(RewriteRecursion::Continue)
}
- fn mutate(&mut self, expr: Expr) -> Result<Expr> {
- if let Expr::Column(Column { name, .. }) = &expr {
- if self.file_schema.field_with_name(name).is_err() {
+ fn mutate(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
+ if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+ if self.file_schema.field_with_name(column.name()).is_err() {
// the column expr must be in the table schema
- return match self.table_schema.field_with_name(name) {
+ return match self.table_schema.field_with_name(column.name()) {
Ok(field) => {
// return the null value corresponding to the data type
let null_value =
ScalarValue::try_from(field.data_type())?;
- Ok(Expr::Literal(null_value))
+ Ok(Arc::new(Literal::new(null_value)))
}
Err(e) => {
// If the column is not in the table schema, should
throw the error
@@ -308,7 +308,7 @@ fn columns_sorted(
/// Build a [`RowFilter`] from the given predicate `Expr`
pub fn build_row_filter(
- expr: &Expr,
+ expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
table_schema: &Schema,
metadata: &ParquetMetaData,
@@ -391,36 +391,14 @@ pub fn build_row_filter(
mod test {
use super::*;
use arrow::datatypes::Field;
- use datafusion_expr::{cast, col, lit};
+ use datafusion_common::ToDFSchema;
+ use datafusion_expr::{cast, col, lit, Expr};
+ use datafusion_physical_expr::create_physical_expr;
+ use datafusion_physical_expr::execution_props::ExecutionProps;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;
- // Assume a column expression for a column not in the table schema is a
projected column and ignore it
- #[test]
- #[should_panic(expected = "building candidate failed")]
- fn test_filter_candidate_builder_ignore_projected_columns() {
- let testdata = crate::test_util::parquet_test_data();
- let file =
std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
- .expect("opening file");
-
- let reader = SerializedFileReader::new(file).expect("creating reader");
-
- let metadata = reader.metadata();
-
- let table_schema =
- parquet_to_arrow_schema(metadata.file_metadata().schema_descr(),
None)
- .expect("parsing schema");
-
- let expr = col("projected_column").eq(lit("value"));
-
- let candidate = FilterCandidateBuilder::new(expr, &table_schema,
&table_schema)
- .build(metadata)
- .expect("building candidate failed");
-
- assert!(candidate.is_none());
- }
-
// We should ignore predicate that read non-primitive columns
#[test]
fn test_filter_candidate_builder_ignore_complex_types() {
@@ -437,6 +415,7 @@ mod test {
.expect("parsing schema");
let expr = col("int64_list").is_not_null();
+ let expr = logical2physical(&expr, &table_schema);
let candidate = FilterCandidateBuilder::new(expr, &table_schema,
&table_schema)
.build(metadata)
@@ -467,8 +446,11 @@ mod test {
// The parquet file with `file_schema` just has `bigint_col` and
`float_col` column, and don't have the `int_col`
let expr = col("bigint_col").eq(cast(col("int_col"), DataType::Int64));
+ let expr = logical2physical(&expr, &table_schema);
let expected_candidate_expr =
col("bigint_col").eq(cast(lit(ScalarValue::Int32(None)),
DataType::Int64));
+ let expected_candidate_expr =
+ logical2physical(&expected_candidate_expr, &table_schema);
let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
.build(metadata)
@@ -476,7 +458,10 @@ mod test {
assert!(candidate.is_some());
- assert_eq!(candidate.unwrap().expr, expected_candidate_expr);
+ assert_eq!(
+ candidate.unwrap().expr.to_string(),
+ expected_candidate_expr.to_string()
+ );
}
#[test]
@@ -496,4 +481,10 @@ mod test {
assert_eq!(projection, remapped)
}
}
+
+ fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
+ let df_schema = schema.clone().to_dfschema().unwrap();
+ let execution_props = ExecutionProps::new();
+ create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
+ }
}
diff --git
a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
index 101102c92..4ba60f085 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
@@ -244,7 +244,10 @@ mod tests {
use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};
- use datafusion_expr::{cast, col, lit};
+ use datafusion_common::ToDFSchema;
+ use datafusion_expr::{cast, col, lit, Expr};
+ use datafusion_physical_expr::execution_props::ExecutionProps;
+ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use parquet::basic::LogicalType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::{
@@ -258,8 +261,9 @@ mod tests {
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
- let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
+ let expr = col("c1").gt(lit(15));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let schema_descr = get_test_schema_descr(vec![(
@@ -290,8 +294,9 @@ mod tests {
fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
- let expr = col("c1").gt(lit(15));
let schema = Schema::new(vec![Field::new("c1", DataType::Int32,
false)]);
+ let expr = col("c1").gt(lit(15));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
@@ -324,12 +329,15 @@ mod tests {
fn row_group_pruning_predicate_partial_expr() {
use datafusion_expr::{col, lit};
// test row group predicate with partially supported expression
- // int > 1 and int % 2 => c1_max > 1 and true
- let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2)));
+ // (int > 1) and ((int % 2) = 0) => c1_max > 1 and true
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
+ let expr = col("c1")
+ .gt(lit(15))
+ .and(col("c2").modulus(lit(2)).eq(lit(0)));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr,
schema.clone()).unwrap();
let schema_descr = get_test_schema_descr(vec![
@@ -362,7 +370,10 @@ mod tests {
// if conditions in predicate are joined with OR and an unsupported
expression is used
// this bypasses the entire predicate expression and no row groups are
filtered out
- let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
+ let expr = col("c1")
+ .gt(lit(15))
+ .or(col("c2").modulus(lit(2)).eq(lit(0)));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr,
schema).unwrap();
// if conditions in predicate are joined with OR and an unsupported
expression is used
@@ -399,11 +410,12 @@ mod tests {
fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
- let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
+ let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr,
schema).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();
@@ -421,13 +433,14 @@ mod tests {
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
- let expr = col("c1")
- .gt(lit(15))
- .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
+ let expr = col("c1")
+ .gt(lit(15))
+ .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr,
schema).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();
@@ -448,7 +461,6 @@ mod tests {
// INT32: c1 > 5, the c1 is decimal(9,2)
// The type of scalar value if decimal(9,2), don't need to do cast
- let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2),
false)]);
let schema_descr = get_test_schema_descr(vec![(
@@ -462,6 +474,8 @@ mod tests {
Some(2),
None,
)]);
+ let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
@@ -503,10 +517,6 @@ mod tests {
// The c1 type is decimal(9,0) in the parquet file, and the type of
scalar is decimal(5,2).
// We should convert all type to the coercion type, which is
decimal(11,2)
// The decimal of arrow is decimal(5,2), the decimal of parquet is
decimal(9,0)
- let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
- lit(ScalarValue::Decimal128(Some(500), 5, 2)),
- Decimal128(11, 2),
- ));
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0),
false)]);
let schema_descr = get_test_schema_descr(vec![(
@@ -520,6 +530,11 @@ mod tests {
Some(0),
None,
)]);
+ let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
+ lit(ScalarValue::Decimal128(Some(500), 5, 2)),
+ Decimal128(11, 2),
+ ));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
@@ -564,7 +579,6 @@ mod tests {
);
// INT64: c1 < 5, the c1 is decimal(18,2)
- let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18,
2)));
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2),
false)]);
let schema_descr = get_test_schema_descr(vec![(
@@ -578,6 +592,8 @@ mod tests {
Some(2),
None,
)]);
+ let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18,
2)));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
@@ -616,9 +632,6 @@ mod tests {
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2),
false)]);
- // cast the type of c1 to decimal(28,3)
- let left = cast(col("c1"), DataType::Decimal128(28, 3));
- let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::FIXED_LEN_BYTE_ARRAY,
@@ -630,6 +643,10 @@ mod tests {
Some(2),
Some(16),
)]);
+ // cast the type of c1 to decimal(28,3)
+ let left = cast(col("c1"), DataType::Decimal128(28, 3));
+ let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
@@ -687,9 +704,6 @@ mod tests {
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2),
false)]);
- // cast the type of c1 to decimal(28,3)
- let left = cast(col("c1"), DataType::Decimal128(28, 3));
- let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let schema_descr = get_test_schema_descr(vec![(
"c1",
PhysicalType::BYTE_ARRAY,
@@ -701,6 +715,10 @@ mod tests {
Some(2),
Some(16),
)]);
+ // cast the type of c1 to decimal(28,3)
+ let left = cast(col("c1"), DataType::Decimal128(28, 3));
+ let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
+ let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
@@ -821,4 +839,10 @@ mod tests {
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
ParquetFileMetrics::new(0, "file.parquet", &metrics)
}
+
+ fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr>
{
+ let df_schema = schema.clone().to_dfschema().unwrap();
+ let execution_props = ExecutionProps::new();
+ create_physical_expr(expr, &df_schema, schema,
&execution_props).unwrap()
+ }
}
diff --git a/datafusion/core/tests/parquet/page_pruning.rs
b/datafusion/core/tests/parquet/page_pruning.rs
index 61e74e80d..baf9d2d36 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -25,8 +25,10 @@ use datafusion::execution::context::SessionState;
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
-use datafusion_common::{ScalarValue, Statistics};
+use datafusion_common::{ScalarValue, Statistics, ToDFSchema};
use datafusion_expr::{col, lit, Expr};
+use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_expr::execution_props::ExecutionProps;
use object_store::path::Path;
use object_store::ObjectMeta;
use tokio_stream::StreamExt;
@@ -58,6 +60,11 @@ async fn get_parquet_exec(state: &SessionState, filter:
Expr) -> ParquetExec {
extensions: None,
};
+ let df_schema = schema.clone().to_dfschema().unwrap();
+ let execution_props = ExecutionProps::new();
+ let predicate =
+ create_physical_expr(&filter, &df_schema, &schema,
&execution_props).unwrap();
+
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url,
@@ -71,7 +78,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr)
-> ParquetExec {
output_ordering: None,
infinite_source: false,
},
- Some(filter),
+ Some(predicate),
None,
);
parquet_exec.with_enable_page_index(true)
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index c04deb92b..5eeb237e1 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -115,7 +115,7 @@ async fn get_exec(
output_ordering: None,
infinite_source: false,
},
- &[],
+ None,
)
.await?;
Ok(exec)
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index c9658a048..7a2ea6872 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -49,7 +49,7 @@ pub use aggregate::AggregateExpr;
pub use datafusion_common::from_slice;
pub use equivalence::EquivalenceProperties;
pub use equivalence::EquivalentClass;
-pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr};
+pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr,
PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::PhysicalSortExpr;
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index 459ce8cd7..f4e9593c8 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -83,6 +83,9 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug +
PartialEq<dyn Any> {
}
}
+/// Shared [`PhysicalExpr`].
+pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
+
/// The shared context used during the analysis of an expression. Includes
/// the boundaries for all known columns.
#[derive(Clone, Debug, PartialEq)]
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index d6d5054ff..612d0e0b8 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -19,14 +19,18 @@ use crate::equivalence::EquivalentClass;
use crate::expressions::BinaryExpr;
use crate::expressions::Column;
use crate::expressions::UnKnownColumn;
+use crate::rewrite::RewriteRecursion;
use crate::rewrite::TreeNodeRewritable;
+use crate::rewrite::TreeNodeRewriter;
use crate::PhysicalSortExpr;
use crate::{EquivalenceProperties, PhysicalExpr};
+use datafusion_common::DataFusionError;
use datafusion_expr::Operator;
use arrow::datatypes::SchemaRef;
use std::collections::HashMap;
+use std::collections::HashSet;
use std::sync::Arc;
/// Compare the two expr lists are equal no matter the order.
@@ -235,6 +239,80 @@ pub fn ordering_satisfy_concrete<F: FnOnce() ->
EquivalenceProperties>(
}
}
+/// Extract referenced [`Column`]s within a [`PhysicalExpr`].
+///
+/// This works recursively.
+pub fn get_phys_expr_columns(pred: &Arc<dyn PhysicalExpr>) -> HashSet<Column> {
+ let mut rewriter = ColumnCollector::default();
+ pred.clone()
+ .transform_using(&mut rewriter)
+ .expect("never fail");
+ rewriter.cols
+}
+
+#[derive(Debug, Default)]
+struct ColumnCollector {
+ cols: HashSet<Column>,
+}
+
+impl TreeNodeRewriter<Arc<dyn PhysicalExpr>> for ColumnCollector {
+ fn pre_visit(
+ &mut self,
+ node: &Arc<dyn PhysicalExpr>,
+ ) -> Result<RewriteRecursion, DataFusionError> {
+ if let Some(column) = node.as_any().downcast_ref::<Column>() {
+ self.cols.insert(column.clone());
+ }
+ Ok(RewriteRecursion::Continue)
+ }
+
+ fn mutate(
+ &mut self,
+ expr: Arc<dyn PhysicalExpr>,
+ ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
+ Ok(expr)
+ }
+}
+
+/// Re-assign column indices referenced in predicate according to given schema.
+///
+/// This may be helpful when dealing with projections.
+pub fn reassign_predicate_columns(
+ pred: Arc<dyn PhysicalExpr>,
+ schema: &SchemaRef,
+ ignore_not_found: bool,
+) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
+ let mut rewriter = ColumnAssigner {
+ schema: schema.clone(),
+ ignore_not_found,
+ };
+ pred.clone().transform_using(&mut rewriter)
+}
+
+#[derive(Debug)]
+struct ColumnAssigner {
+ schema: SchemaRef,
+ ignore_not_found: bool,
+}
+
+impl TreeNodeRewriter<Arc<dyn PhysicalExpr>> for ColumnAssigner {
+ fn mutate(
+ &mut self,
+ expr: Arc<dyn PhysicalExpr>,
+ ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
+ if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+ let index = match self.schema.index_of(column.name()) {
+ Ok(idx) => idx,
+ Err(_) if self.ignore_not_found => usize::MAX,
+ Err(e) => return Err(e.into()),
+ };
+ return Ok(Arc::new(Column::new(column.name(), index)));
+ }
+
+ Ok(expr)
+ }
+}
+
#[cfg(test)]
mod tests {
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 103fe3fac..d0deb567f 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1165,7 +1165,11 @@ message FileScanExecConf {
message ParquetScanExecNode {
FileScanExecConf base_conf = 1;
- LogicalExprNode pruning_predicate = 2;
+
+ // Was pruning predicate based on a logical expr.
+ reserved 2;
+
+ PhysicalExprNode predicate = 3;
}
message CsvScanExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 335f6f1c5..7246b5f2b 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -11832,15 +11832,15 @@ impl serde::Serialize for ParquetScanExecNode {
if self.base_conf.is_some() {
len += 1;
}
- if self.pruning_predicate.is_some() {
+ if self.predicate.is_some() {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.ParquetScanExecNode", len)?;
if let Some(v) = self.base_conf.as_ref() {
struct_ser.serialize_field("baseConf", v)?;
}
- if let Some(v) = self.pruning_predicate.as_ref() {
- struct_ser.serialize_field("pruningPredicate", v)?;
+ if let Some(v) = self.predicate.as_ref() {
+ struct_ser.serialize_field("predicate", v)?;
}
struct_ser.end()
}
@@ -11854,14 +11854,13 @@ impl<'de> serde::Deserialize<'de> for
ParquetScanExecNode {
const FIELDS: &[&str] = &[
"base_conf",
"baseConf",
- "pruning_predicate",
- "pruningPredicate",
+ "predicate",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
BaseConf,
- PruningPredicate,
+ Predicate,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -11884,7 +11883,7 @@ impl<'de> serde::Deserialize<'de> for
ParquetScanExecNode {
{
match value {
"baseConf" | "base_conf" =>
Ok(GeneratedField::BaseConf),
- "pruningPredicate" | "pruning_predicate" =>
Ok(GeneratedField::PruningPredicate),
+ "predicate" => Ok(GeneratedField::Predicate),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -11905,7 +11904,7 @@ impl<'de> serde::Deserialize<'de> for
ParquetScanExecNode {
V: serde::de::MapAccess<'de>,
{
let mut base_conf__ = None;
- let mut pruning_predicate__ = None;
+ let mut predicate__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::BaseConf => {
@@ -11914,17 +11913,17 @@ impl<'de> serde::Deserialize<'de> for
ParquetScanExecNode {
}
base_conf__ = map.next_value()?;
}
- GeneratedField::PruningPredicate => {
- if pruning_predicate__.is_some() {
- return
Err(serde::de::Error::duplicate_field("pruningPredicate"));
+ GeneratedField::Predicate => {
+ if predicate__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("predicate"));
}
- pruning_predicate__ = map.next_value()?;
+ predicate__ = map.next_value()?;
}
}
}
Ok(ParquetScanExecNode {
base_conf: base_conf__,
- pruning_predicate: pruning_predicate__,
+ predicate: predicate__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 029380a99..da95fd558 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1680,8 +1680,8 @@ pub struct FileScanExecConf {
pub struct ParquetScanExecNode {
#[prost(message, optional, tag = "1")]
pub base_conf: ::core::option::Option<FileScanExecConf>,
- #[prost(message, optional, tag = "2")]
- pub pruning_predicate: ::core::option::Option<LogicalExprNode>,
+ #[prost(message, optional, tag = "3")]
+ pub predicate: ::core::option::Option<PhysicalExprNode>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 0898ec416..8c2ce822f 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -53,7 +53,6 @@ use prost::Message;
use crate::common::proto_error;
use crate::common::{csv_delimiter_to_string, str_to_byte};
-use crate::logical_plan;
use crate::physical_plan::from_proto::{
parse_physical_expr, parse_protobuf_file_scan_config,
};
@@ -156,19 +155,22 @@ impl AsExecutionPlan for PhysicalPlanNode {
FileCompressionType::UNCOMPRESSED,
))),
PhysicalPlanType::ParquetScan(scan) => {
+ let base_config = parse_protobuf_file_scan_config(
+ scan.base_conf.as_ref().unwrap(),
+ registry,
+ )?;
let predicate = scan
- .pruning_predicate
+ .predicate
.as_ref()
- .map(|expr| logical_plan::from_proto::parse_expr(expr,
registry))
+ .map(|expr| {
+ parse_physical_expr(
+ expr,
+ registry,
+ base_config.file_schema.as_ref(),
+ )
+ })
.transpose()?;
- Ok(Arc::new(ParquetExec::new(
- parse_protobuf_file_scan_config(
- scan.base_conf.as_ref().unwrap(),
- registry,
- )?,
- predicate,
- None,
- )))
+ Ok(Arc::new(ParquetExec::new(base_config, predicate, None)))
}
PhysicalPlanType::AvroScan(scan) => {
Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
@@ -956,13 +958,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
} else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
let pruning_expr = exec
.pruning_predicate()
- .map(|pred| pred.logical_expr().try_into())
+ .map(|pred| pred.orig_expr().clone().try_into())
.transpose()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
- pruning_predicate: pruning_expr,
+ predicate: pruning_expr,
},
)),
})
@@ -1218,7 +1220,7 @@ mod roundtrip_tests {
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::aggregates::PhysicalGroupBy;
- use datafusion::physical_plan::expressions::{like, GetIndexedFieldExpr};
+ use datafusion::physical_plan::expressions::{like, BinaryExpr,
GetIndexedFieldExpr};
use datafusion::physical_plan::functions;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::physical_plan::projection::ProjectionExec;
@@ -1510,7 +1512,11 @@ mod roundtrip_tests {
infinite_source: false,
};
- let predicate =
datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
+ let predicate = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("col", 1)),
+ Operator::Eq,
+ lit("1"),
+ ));
roundtrip_test(Arc::new(ParquetExec::new(
scan_config,
Some(predicate),
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index e1b3c5c18..59c3024d7 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -158,7 +158,11 @@ impl TestParquetFile {
&ExecutionProps::default(),
)?;
- let parquet_exec = Arc::new(ParquetExec::new(scan_config,
Some(filter), None));
+ let parquet_exec = Arc::new(ParquetExec::new(
+ scan_config,
+ Some(physical_filter_expr.clone()),
+ None,
+ ));
let exec = Arc::new(FilterExec::try_new(physical_filter_expr,
parquet_exec)?);