This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9fbee1ae5 Pushdown `RowFilter` in `ParquetExec` (#3380)
9fbee1ae5 is described below
commit 9fbee1ae5001c91e4ad991ec8918e9eb0359ab3f
Author: Dan Harris <[email protected]>
AuthorDate: Tue Sep 13 15:16:26 2022 -0400
Pushdown `RowFilter` in `ParquetExec` (#3380)
* Initial implementation
* Remove debugging cruft
* Add license header
* PR comments
* no need to prep null mask
* PR comments
* unit tests
* Fix comment
* fix doc string
* Update datafusion/core/src/physical_plan/file_format/row_filter.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/expr/src/expr_fn.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/core/src/physical_plan/file_format/parquet.rs
Co-authored-by: Andrew Lamb <[email protected]>
* PR comments
* Fix compiler error after rebase
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/physical_optimizer/pruning.rs | 1 +
.../core/src/physical_plan/file_format/mod.rs | 1 +
.../core/src/physical_plan/file_format/parquet.rs | 190 ++++++++--
.../src/physical_plan/file_format/row_filter.rs | 400 +++++++++++++++++++++
datafusion/expr/src/expr_fn.rs | 84 +++++
5 files changed, 652 insertions(+), 24 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 73f6c795c..f5433d7e8 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -44,6 +44,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
+
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 31fa1f2d8..707ed039c 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -25,6 +25,7 @@ mod delimited_stream;
mod file_stream;
mod json;
mod parquet;
+mod row_filter;
pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 14e679c7f..a6ce44b22 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -29,6 +29,7 @@ use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
+use crate::physical_plan::file_format::row_filter::build_row_filter;
use crate::physical_plan::file_format::FileMeta;
use crate::{
error::{DataFusionError, Result},
@@ -67,6 +68,30 @@ use parquet::file::{
};
use parquet::schema::types::ColumnDescriptor;
+#[derive(Debug, Clone, Default)]
+/// Specify options for the parquet scan
+pub struct ParquetScanOptions {
+ /// If true, any available `pruning_predicate` will be converted to a
`RowFilter`
+ /// and pushed down to the `ParquetRecordBatchStream`. This will enable
row level
+ /// filter at the decoder level. Defaults to false
+ pushdown_filters: bool,
+ /// If true, the generated `RowFilter` may reorder the predicate `Expr`s
to try and optimize
+ /// the cost of filter evaluation.
+ reorder_predicates: bool,
+}
+
+impl ParquetScanOptions {
+ pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
+ self.pushdown_filters = pushdown_filters;
+ self
+ }
+
+ pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self
{
+ self.reorder_predicates = reorder_predicates;
+ self
+ }
+}
+
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
@@ -81,6 +106,8 @@ pub struct ParquetExec {
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
+ /// Options to specify behavior of parquet scan
+ scan_options: ParquetScanOptions,
}
impl ParquetExec {
@@ -121,6 +148,7 @@ impl ParquetExec {
pruning_predicate,
metadata_size_hint,
parquet_file_reader_factory: None,
+ scan_options: ParquetScanOptions::default(),
}
}
@@ -148,6 +176,12 @@ impl ParquetExec {
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}
+
+ /// Configure `ParquetScanOptions`
+ pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) ->
Self {
+ self.scan_options = scan_options;
+ self
+ }
}
/// Stores metrics about the parquet execution for a particular parquet file.
@@ -258,6 +292,7 @@ impl ExecutionPlan for ParquetExec {
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics.clone(),
parquet_file_reader_factory,
+ scan_options: self.scan_options.clone(),
};
let stream = FileStream::new(
@@ -319,6 +354,7 @@ struct ParquetOpener {
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+ scan_options: ParquetScanOptions,
}
impl FileOpener for ParquetOpener {
@@ -347,9 +383,12 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
let projection = self.projection.clone();
let pruning_predicate = self.pruning_predicate.clone();
+ let table_schema = self.table_schema.clone();
+ let reorder_predicates = self.scan_options.reorder_predicates;
+ let pushdown_filters = self.scan_options.pushdown_filters;
Ok(Box::pin(async move {
- let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+ let mut builder =
ParquetRecordBatchStreamBuilder::new(reader).await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;
@@ -358,6 +397,21 @@ impl FileOpener for ParquetOpener {
adapted_projections.iter().cloned(),
);
+ if let Some(predicate) = pushdown_filters
+ .then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
+ .flatten()
+ {
+ if let Ok(Some(filter)) = build_row_filter(
+ predicate.clone(),
+ builder.schema().as_ref(),
+ table_schema.as_ref(),
+ builder.metadata(),
+ reorder_predicates,
+ ) {
+ builder = builder.with_row_filter(filter);
+ }
+ };
+
let groups = builder.metadata().row_groups();
let row_groups =
prune_row_groups(groups, file_range, pruning_predicate,
&metrics);
@@ -839,6 +893,7 @@ mod tests {
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
predicate: Option<Expr>,
+ pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
let file_schema = match schema {
Some(schema) => schema,
@@ -851,7 +906,7 @@ mod tests {
let file_groups = meta.into_iter().map(Into::into).collect();
// prepare the scan
- let parquet_exec = ParquetExec::new(
+ let mut parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
@@ -865,6 +920,14 @@ mod tests {
None,
);
+ if pushdown_predicate {
+ parquet_exec = parquet_exec.with_scan_options(
+ ParquetScanOptions::default()
+ .with_pushdown_filters(true)
+ .with_reorder_predicates(true),
+ );
+ }
+
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
collect(Arc::new(parquet_exec), task_ctx).await
@@ -912,9 +975,10 @@ mod tests {
let batch3 = add_to_batch(&batch1, "c3", c3);
// read/write them files:
- let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None,
None, None)
- .await
- .unwrap();
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2, batch3], None, None,
None, false)
+ .await
+ .unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
@@ -953,7 +1017,7 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
// read/write them files:
- let read = round_trip_to_parquet(vec![batch1, batch2], None, None,
None)
+ let read = round_trip_to_parquet(vec![batch1, batch2], None, None,
None, false)
.await
.unwrap();
let expected = vec![
@@ -987,7 +1051,7 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
// read/write them files:
- let read = round_trip_to_parquet(vec![batch1, batch2], None, None,
None)
+ let read = round_trip_to_parquet(vec![batch1, batch2], None, None,
None, false)
.await
.unwrap();
let expected = vec![
@@ -1020,24 +1084,60 @@ mod tests {
// batch2: c3(int8), c2(int64)
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
- let filter = col("c2").eq(lit(0_i64));
+ let filter = col("c2").eq(lit(2_i64));
// read/write them files:
- let read = round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter))
- .await
- .unwrap();
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter), false)
+ .await
+ .unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c3 | c2 |",
"+-----+----+----+",
- "| Foo | 10 | |",
+ "| | | |",
+ "| | 10 | 1 |",
"| | 20 | |",
+ "| | 20 | 2 |",
+ "| Foo | 10 | |",
"| bar | | |",
"+-----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}
+ #[tokio::test]
+ async fn evolved_schema_intersection_filter_with_filter_pushdown() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20),
None]));
+
+ // batch1: c1(string), c3(int8)
+ let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+ // batch2: c3(int8), c2(int64)
+ let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+ let filter = col("c2").eq(lit(2_i64));
+
+ // read/write them files:
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter), true)
+ .await
+ .unwrap();
+ let expected = vec![
+ "+----+----+----+",
+ "| c1 | c3 | c2 |",
+ "+----+----+----+",
+ "| | 20 | 2 |",
+ "+----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
#[tokio::test]
async fn evolved_schema_projection() {
let c1: ArrayRef =
@@ -1061,10 +1161,15 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1),
("c4", c4)]);
// read/write them files:
- let read =
- round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]),
None, None)
- .await
- .unwrap();
+ let read = round_trip_to_parquet(
+ vec![batch1, batch2],
+ Some(vec![0, 3]),
+ None,
+ None,
+ false,
+ )
+ .await
+ .unwrap();
let expected = vec![
"+-----+-----+",
"| c1 | c4 |",
@@ -1102,9 +1207,10 @@ mod tests {
let filter = col("c3").eq(lit(0_i8));
// read/write them files:
- let read = round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter))
- .await
- .unwrap();
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter), false)
+ .await
+ .unwrap();
// Predicate should prune all row groups
assert_eq!(read.len(), 0);
@@ -1123,12 +1229,13 @@ mod tests {
// batch2: c2(int64)
let batch2 = create_batch(vec![("c2", c2)]);
- let filter = col("c2").eq(lit(0_i64));
+ let filter = col("c2").eq(lit(1_i64));
// read/write them files:
- let read = round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter))
- .await
- .unwrap();
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter), false)
+ .await
+ .unwrap();
// This does not look correct since the "c2" values in the result do
not in fact match the predicate `c2 == 0`
// but parquet pruning is not exact. If the min/max values are not
defined (which they are not in this case since the it is
@@ -1139,14 +1246,48 @@ mod tests {
"+-----+----+",
"| c1 | c2 |",
"+-----+----+",
- "| Foo | |",
"| | |",
+ "| | |",
+ "| | 1 |",
+ "| | 2 |",
+ "| Foo | |",
"| bar | |",
"+-----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}
+ #[tokio::test]
+ async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ // batch1: c1(string)
+ let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+ // batch2: c2(int64)
+ let batch2 = create_batch(vec![("c2", c2)]);
+
+ let filter = col("c2").eq(lit(1_i64));
+
+ // read/write them files:
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2], None, None,
Some(filter), true)
+ .await
+ .unwrap();
+
+ let expected = vec![
+ "+----+----+",
+ "| c1 | c2 |",
+ "+----+----+",
+ "| | 1 |",
+ "+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
@@ -1181,6 +1322,7 @@ mod tests {
None,
Some(Arc::new(schema)),
None,
+ false,
)
.await;
assert_contains!(read.unwrap_err().to_string(),
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
new file mode 100644
index 000000000..56bdba557
--- /dev/null
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -0,0 +1,400 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, BooleanArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Column, Result, ScalarValue, ToDFSchema};
+use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter,
RewriteRecursion};
+
+use datafusion_expr::{uncombine_filter, Expr};
+use datafusion_physical_expr::execution_props::ExecutionProps;
+use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
+use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use parquet::arrow::ProjectionMask;
+use parquet::file::metadata::ParquetMetaData;
+use std::sync::Arc;
+
+/// This module contains utilities for enabling the pushdown of DataFusion
filter predicates (which
+/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the
parquet decoder level in `arrow-rs`.
+/// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet
into arrow `RecordBatch`es.
+/// When constructing the `ParquetRecordBatchStream` you can provide a
`RowFilter` which is itself just a vector
+/// of `Box<dyn ArrowPredicate>`. During decoding, the predicates are
evaluated to generate a mask which is used
+/// to avoid decoding rows in projected columns which are not selected which
can significantly reduce the amount
+/// of compute required for decoding.
+///
+/// Since the predicates are applied serially in the order defined in the
`RowFilter`, the optimal ordering
+/// will depend on the exact filters. The best filters to execute first have
two properties:
+/// 1. The are relatively inexpensive to evaluate (e.g. they read column
chunks which are relatively small)
+/// 2. They filter a lot of rows, reducing the amount of decoding required
for subsequent filters and projected columns
+///
+/// Given the metadata exposed by parquet, the selectivity of filters is not
easy to estimate so the heuristics we use here primarily
+/// focus on the evaluation cost.
+///
+/// The basic algorithm for constructing the `RowFilter` is as follows
+/// 1. Recursively break conjunctions into separate predicates. An
expression like `a = 1 AND (b = 2 AND c = 3)` would be
+/// separated into the expressions `a = 1`, `b = 2`, and `c = 3`.
+/// 2. Determine whether each predicate is suitable as an
`ArrowPredicate`. As long as the predicate does not reference any projected
columns
+/// or columns with non-primitive types, then it is considered suitable.
+/// 3. Determine, for each predicate, the total compressed size of all
columns required to evaluate the predicate.
+/// 4. Determine, for each predicate, whether all columns required to
evaluate the expression are sorted.
+/// 5. Re-order the predicate by total size (from step 3).
+/// 6. Partition the predicates according to whether they are sorted (from
step 4)
+/// 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
+/// 8. Build the `RowFilter` with the sorted predicates followed by the
unsorted predicates. Within each partition
+/// the predicates will still be sorted by size.
+
+/// A predicate which can be passed to `ParquetRecordBatchStream` to perform
row-level
+/// filtering during parquet decoding.
+#[derive(Debug)]
+pub(crate) struct DatafusionArrowPredicate {
+ physical_expr: Arc<dyn PhysicalExpr>,
+ projection: ProjectionMask,
+}
+
+impl DatafusionArrowPredicate {
+ pub fn try_new(
+ candidate: FilterCandidate,
+ schema: &Schema,
+ metadata: &ParquetMetaData,
+ ) -> Result<Self> {
+ let props = ExecutionProps::default();
+
+ let schema = schema.project(&candidate.projection)?;
+ let df_schema = schema.clone().to_dfschema()?;
+
+ let physical_expr =
+ create_physical_expr(&candidate.expr, &df_schema, &schema,
&props)?;
+
+ Ok(Self {
+ physical_expr,
+ projection: ProjectionMask::roots(
+ metadata.file_metadata().schema_descr(),
+ candidate.projection,
+ ),
+ })
+ }
+}
+
+impl ArrowPredicate for DatafusionArrowPredicate {
+ fn projection(&self) -> &ProjectionMask {
+ &self.projection
+ }
+
+ fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+ match self
+ .physical_expr
+ .evaluate(&batch)
+ .map(|v| v.into_array(batch.num_rows()))
+ {
+ Ok(array) => {
+ if let Some(mask) =
array.as_any().downcast_ref::<BooleanArray>() {
+ Ok(BooleanArray::from(mask.data().clone()))
+ } else {
+ Err(ArrowError::ComputeError(
+ "Unexpected result of predicate evaluation, expected
BooleanArray".to_owned(),
+ ))
+ }
+ }
+ Err(e) => Err(ArrowError::ComputeError(format!(
+ "Error evaluating filter predicate: {:?}",
+ e
+ ))),
+ }
+ }
+}
+
+/// A candidate expression for creating a `RowFilter` contains the
+/// expression as well as data to estimate the cost of evaluating
+/// the resulting expression.
+pub(crate) struct FilterCandidate {
+ expr: Expr,
+ required_bytes: usize,
+ can_use_index: bool,
+ projection: Vec<usize>,
+}
+
+/// Helper to build a `FilterCandidate`. This will do several things
+/// 1. Determine the columns required to evaluate the expression
+/// 2. Calculate data required to estimate the cost of evaluating the filter
+/// 3. Rewrite column expressions in the predicate which reference columns not
in the particular file schema.
+/// This is relevant in the case where we have determined the table schema
by merging all individual file schemas
+/// and any given file may or may not contain all columns in the merged
schema. If a particular column is not present
+/// we replace the column expression with a literal expression that
produces a null value.
+struct FilterCandidateBuilder<'a> {
+ expr: Expr,
+ file_schema: &'a Schema,
+ table_schema: &'a Schema,
+ required_column_indices: Vec<usize>,
+ non_primitive_columns: bool,
+ projected_columns: bool,
+}
+
+impl<'a> FilterCandidateBuilder<'a> {
+ pub fn new(expr: Expr, file_schema: &'a Schema, table_schema: &'a Schema)
-> Self {
+ Self {
+ expr,
+ file_schema,
+ table_schema,
+ required_column_indices: vec![],
+ non_primitive_columns: false,
+ projected_columns: false,
+ }
+ }
+
+ pub fn build(
+ mut self,
+ metadata: &ParquetMetaData,
+ ) -> Result<Option<FilterCandidate>> {
+ let expr = self.expr.clone();
+ let expr = expr.rewrite(&mut self)?;
+
+ if self.non_primitive_columns || self.projected_columns {
+ Ok(None)
+ } else {
+ let required_bytes =
+ size_of_columns(&self.required_column_indices, metadata)?;
+ let can_use_index = columns_sorted(&self.required_column_indices,
metadata)?;
+
+ Ok(Some(FilterCandidate {
+ expr,
+ required_bytes,
+ can_use_index,
+ projection: self.required_column_indices,
+ }))
+ }
+ }
+}
+
+impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
+ fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
+ if let Expr::Column(column) = expr {
+ if let Ok(idx) = self.file_schema.index_of(&column.name) {
+ self.required_column_indices.push(idx);
+
+ if !is_primitive_field(self.file_schema.field(idx)) {
+ self.non_primitive_columns = true;
+ }
+ } else if self.table_schema.index_of(&column.name).is_err() {
+ // If the column does not exist in the (un-projected) table
schema then
+ // it must be a projected column.
+ self.projected_columns = true;
+ }
+ }
+ Ok(RewriteRecursion::Continue)
+ }
+
+ fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+ if let Expr::Column(Column { name, .. }) = &expr {
+ if self.file_schema.field_with_name(name).is_err() {
+ return Ok(Expr::Literal(ScalarValue::Null));
+ }
+ }
+
+ Ok(expr)
+ }
+}
+
+/// Calculate the total compressed size of all `Column's required for
+/// predicate `Expr`. This should represent the total amount of file IO
+/// required to evaluate the predicate.
+fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) ->
Result<usize> {
+ let mut total_size = 0;
+ let row_groups = metadata.row_groups();
+ for idx in columns {
+ for rg in row_groups.iter() {
+ total_size += rg.column(*idx).compressed_size() as usize;
+ }
+ }
+
+ Ok(total_size)
+}
+
+/// For a given set of `Column`s required for predicate `Expr` determine
whether all
+/// columns are sorted. Sorted columns may be queried more efficiently in the
presence of
+/// a PageIndex.
+fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) ->
Result<bool> {
+ // TODO How do we know this?
+ Ok(false)
+}
+
+/// Build a [`RowFilter`] from the given predicate `Expr`
+pub fn build_row_filter(
+ expr: Expr,
+ file_schema: &Schema,
+ table_schema: &Schema,
+ metadata: &ParquetMetaData,
+ reorder_predicates: bool,
+) -> Result<Option<RowFilter>> {
+ let predicates = uncombine_filter(expr);
+
+ let mut candidates: Vec<FilterCandidate> = predicates
+ .into_iter()
+ .flat_map(|expr| {
+ if let Ok(candidate) =
+ FilterCandidateBuilder::new(expr, file_schema, table_schema)
+ .build(metadata)
+ {
+ candidate
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ if candidates.is_empty() {
+ Ok(None)
+ } else if reorder_predicates {
+ candidates.sort_by_key(|c| c.required_bytes);
+
+ let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) =
+ candidates.into_iter().partition(|c| c.can_use_index);
+
+ let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
+
+ for candidate in indexed_candidates {
+ let filter =
+ DatafusionArrowPredicate::try_new(candidate, file_schema,
metadata)?;
+
+ filters.push(Box::new(filter));
+ }
+
+ for candidate in other_candidates {
+ let filter =
+ DatafusionArrowPredicate::try_new(candidate, file_schema,
metadata)?;
+
+ filters.push(Box::new(filter));
+ }
+
+ Ok(Some(RowFilter::new(filters)))
+ } else {
+ let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
+ for candidate in candidates {
+ let filter =
+ DatafusionArrowPredicate::try_new(candidate, file_schema,
metadata)?;
+
+ filters.push(Box::new(filter));
+ }
+
+ Ok(Some(RowFilter::new(filters)))
+ }
+}
+
+/// return true if this is a non nested type.
+// TODO remove after https://github.com/apache/arrow-rs/issues/2704 is done
+fn is_primitive_field(field: &Field) -> bool {
+ !matches!(
+ field.data_type(),
+ DataType::List(_)
+ | DataType::FixedSizeList(_, _)
+ | DataType::LargeList(_)
+ | DataType::Struct(_)
+ | DataType::Union(_, _, _)
+ | DataType::Map(_, _)
+ )
+}
+
+#[cfg(test)]
+mod test {
+ use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_common::ScalarValue;
+ use datafusion_expr::{col, lit};
+ use parquet::arrow::parquet_to_arrow_schema;
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+
+ // Assume a column expression for a column not in the table schema is a
projected column and ignore it
+ #[test]
+ fn test_filter_candidate_builder_ignore_projected_columns() {
+ let testdata = crate::test_util::parquet_test_data();
+ let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet",
testdata))
+ .expect("opening file");
+
+ let reader = SerializedFileReader::new(file).expect("creating reader");
+
+ let metadata = reader.metadata();
+
+ let table_schema =
+ parquet_to_arrow_schema(metadata.file_metadata().schema_descr(),
None)
+ .expect("parsing schema");
+
+ let expr = col("projected_column").eq(lit("value"));
+
+ let candidate = FilterCandidateBuilder::new(expr, &table_schema,
&table_schema)
+ .build(metadata)
+ .expect("building candidate");
+
+ assert!(candidate.is_none());
+ }
+
+ // We should ignore predicate that read non-primitive columns
+ #[test]
+ fn test_filter_candidate_builder_ignore_complex_types() {
+ let testdata = crate::test_util::parquet_test_data();
+ let file = std::fs::File::open(&format!("{}/list_columns.parquet",
testdata))
+ .expect("opening file");
+
+ let reader = SerializedFileReader::new(file).expect("creating reader");
+
+ let metadata = reader.metadata();
+
+ let table_schema =
+ parquet_to_arrow_schema(metadata.file_metadata().schema_descr(),
None)
+ .expect("parsing schema");
+
+ let expr = col("int64_list").is_not_null();
+
+ let candidate = FilterCandidateBuilder::new(expr, &table_schema,
&table_schema)
+ .build(metadata)
+ .expect("building candidate");
+
+ assert!(candidate.is_none());
+ }
+
+ // If a column exists in the table schema but not the file schema it
should be rewritten to a null expression
+ #[test]
+ fn test_filter_candidate_builder_rewrite_missing_column() {
+ let testdata = crate::test_util::parquet_test_data();
+ let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet",
testdata))
+ .expect("opening file");
+
+ let reader = SerializedFileReader::new(file).expect("creating reader");
+
+ let metadata = reader.metadata();
+
+ let table_schema =
+ parquet_to_arrow_schema(metadata.file_metadata().schema_descr(),
None)
+ .expect("parsing schema");
+
+ let file_schema = Schema::new(vec![
+ Field::new("bigint_col", DataType::Int64, true),
+ Field::new("float_col", DataType::Float32, true),
+ ]);
+
+ let expr = col("bigint_col").eq(col("int_col"));
+ let expected_candidate_expr =
col("bigint_col").eq(lit(ScalarValue::Null));
+
+ let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
+ .build(metadata)
+ .expect("building candidate");
+
+ assert!(candidate.is_some());
+
+ assert_eq!(candidate.unwrap().expr, expected_candidate_expr);
+ }
+}
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 6c5cc0ecc..8d89f6f41 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -452,6 +452,37 @@ pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
Some(combined_filter)
}
+/// Take combined filter (multiple boolean expressions ANDed together)
+/// and break down into distinct filters. This should be the inverse of
+/// `combine_filters`
+pub fn uncombine_filter(filter: Expr) -> Vec<Expr> {
+ match filter {
+ Expr::BinaryExpr {
+ left,
+ op: Operator::And,
+ right,
+ } => {
+ let mut exprs = uncombine_filter(*left);
+ exprs.extend(uncombine_filter(*right));
+ exprs
+ }
+ expr => {
+ vec![expr]
+ }
+ }
+}
+
+/// Combines an array of filter expressions into a single filter expression
+/// consisting of the input filter expressions joined with logical OR.
+/// Returns None if the filters array is empty.
+pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option<Expr> {
+ if filters.is_empty() {
+ return None;
+ }
+
+ filters.iter().cloned().reduce(or)
+}
+
/// Recursively un-alias an expressions
#[inline]
pub fn unalias(expr: Expr) -> Expr {
@@ -521,6 +552,7 @@ pub fn call_fn(name: impl AsRef<str>, args: Vec<Expr>) ->
Result<Expr> {
#[cfg(test)]
mod test {
use super::*;
+ use arrow::datatypes::{Field, Schema};
#[test]
fn filter_is_null_and_is_not_null() {
@@ -698,4 +730,56 @@ mod test {
combine_filters(&[filter1.clone(), filter2.clone(),
filter3.clone()]);
assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
}
+
+ fn assert_predicates(actual: Vec<Expr>, expected: Vec<Expr>) {
+ assert_eq!(
+ actual.len(),
+ expected.len(),
+ "Predicates are not equal, found {} predicates but expected {}",
+ actual.len(),
+ expected.len()
+ );
+
+ for expr in expected.into_iter() {
+ assert!(
+ actual.contains(&expr),
+ "Predicates are not equal, predicate {:?} not found in {:?}",
+ expr,
+ actual
+ );
+ }
+ }
+
+ #[test]
+ fn test_uncombine_filter() {
+ let _schema = Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Utf8, true),
+ Field::new("c", DataType::Utf8, true),
+ ]);
+
+ let expr = col("a").eq(lit("s"));
+ let actual = uncombine_filter(expr);
+
+ assert_predicates(actual, vec![col("a").eq(lit("s"))]);
+ }
+
+ #[test]
+ fn test_uncombine_filter_recursively() {
+ let _schema = Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Utf8, true),
+ Field::new("c", DataType::Utf8, true),
+ ]);
+
+ let expr = and(col("a"), col("b"));
+ let actual = uncombine_filter(expr);
+
+ assert_predicates(actual, vec![col("a"), col("b")]);
+
+ let expr = col("a").and(col("b")).or(col("c"));
+ let actual = uncombine_filter(expr.clone());
+
+ assert_predicates(actual, vec![expr]);
+ }
}