This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fbee1ae5 Pushdown `RowFilter` in `ParquetExec` (#3380)
9fbee1ae5 is described below

commit 9fbee1ae5001c91e4ad991ec8918e9eb0359ab3f
Author: Dan Harris <[email protected]>
AuthorDate: Tue Sep 13 15:16:26 2022 -0400

    Pushdown `RowFilter` in `ParquetExec` (#3380)
    
    * Initial implementation
    
    * Remove debugging cruft
    
    * Add license header
    
    * PR comments
    
    * no need to prep null mask
    
    * PR comments
    
    * unit tests
    
    * Fix comment
    
    * fix doc string
    
    * Update datafusion/core/src/physical_plan/file_format/row_filter.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/expr/src/expr_fn.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/core/src/physical_plan/file_format/parquet.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * PR comments
    
    * Fix compiler error after rebase
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/physical_optimizer/pruning.rs  |   1 +
 .../core/src/physical_plan/file_format/mod.rs      |   1 +
 .../core/src/physical_plan/file_format/parquet.rs  | 190 ++++++++--
 .../src/physical_plan/file_format/row_filter.rs    | 400 +++++++++++++++++++++
 datafusion/expr/src/expr_fn.rs                     |  84 +++++
 5 files changed, 652 insertions(+), 24 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index 73f6c795c..f5433d7e8 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -44,6 +44,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
+
 use datafusion_expr::utils::expr_to_columns;
 use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
 use datafusion_physical_expr::create_physical_expr;
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 31fa1f2d8..707ed039c 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -25,6 +25,7 @@ mod delimited_stream;
 mod file_stream;
 mod json;
 mod parquet;
+mod row_filter;
 
 pub(crate) use self::csv::plan_to_csv;
 pub use self::csv::CsvExec;
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 14e679c7f..a6ce44b22 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -29,6 +29,7 @@ use crate::datasource::listing::FileRange;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
+use crate::physical_plan::file_format::row_filter::build_row_filter;
 use crate::physical_plan::file_format::FileMeta;
 use crate::{
     error::{DataFusionError, Result},
@@ -67,6 +68,30 @@ use parquet::file::{
 };
 use parquet::schema::types::ColumnDescriptor;
 
+#[derive(Debug, Clone, Default)]
+/// Specify options for the parquet scan
+pub struct ParquetScanOptions {
+    /// If true, any available `pruning_predicate` will be converted to a 
`RowFilter`
+    /// and pushed down to the `ParquetRecordBatchStream`. This will enable 
row level
+    /// filter at the decoder level. Defaults to false
+    pushdown_filters: bool,
+    /// If true, the generated `RowFilter` may reorder the predicate `Expr`s 
to try and optimize
+    /// the cost of filter evaluation.
+    reorder_predicates: bool,
+}
+
+impl ParquetScanOptions {
+    pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
+        self.pushdown_filters = pushdown_filters;
+        self
+    }
+
+    pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self 
{
+        self.reorder_predicates = reorder_predicates;
+        self
+    }
+}
+
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
@@ -81,6 +106,8 @@ pub struct ParquetExec {
     metadata_size_hint: Option<usize>,
     /// Optional user defined parquet file reader factory
     parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
+    /// Options to specify behavior of parquet scan
+    scan_options: ParquetScanOptions,
 }
 
 impl ParquetExec {
@@ -121,6 +148,7 @@ impl ParquetExec {
             pruning_predicate,
             metadata_size_hint,
             parquet_file_reader_factory: None,
+            scan_options: ParquetScanOptions::default(),
         }
     }
 
@@ -148,6 +176,12 @@ impl ParquetExec {
         self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
         self
     }
+
+    /// Configure `ParquetScanOptions`
+    pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) -> 
Self {
+        self.scan_options = scan_options;
+        self
+    }
 }
 
 /// Stores metrics about the parquet execution for a particular parquet file.
@@ -258,6 +292,7 @@ impl ExecutionPlan for ParquetExec {
             metadata_size_hint: self.metadata_size_hint,
             metrics: self.metrics.clone(),
             parquet_file_reader_factory,
+            scan_options: self.scan_options.clone(),
         };
 
         let stream = FileStream::new(
@@ -319,6 +354,7 @@ struct ParquetOpener {
     metadata_size_hint: Option<usize>,
     metrics: ExecutionPlanMetricsSet,
     parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+    scan_options: ParquetScanOptions,
 }
 
 impl FileOpener for ParquetOpener {
@@ -347,9 +383,12 @@ impl FileOpener for ParquetOpener {
         let batch_size = self.batch_size;
         let projection = self.projection.clone();
         let pruning_predicate = self.pruning_predicate.clone();
+        let table_schema = self.table_schema.clone();
+        let reorder_predicates = self.scan_options.reorder_predicates;
+        let pushdown_filters = self.scan_options.pushdown_filters;
 
         Ok(Box::pin(async move {
-            let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+            let mut builder = 
ParquetRecordBatchStreamBuilder::new(reader).await?;
             let adapted_projections =
                 schema_adapter.map_projections(builder.schema(), &projection)?;
 
@@ -358,6 +397,21 @@ impl FileOpener for ParquetOpener {
                 adapted_projections.iter().cloned(),
             );
 
+            if let Some(predicate) = pushdown_filters
+                .then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
+                .flatten()
+            {
+                if let Ok(Some(filter)) = build_row_filter(
+                    predicate.clone(),
+                    builder.schema().as_ref(),
+                    table_schema.as_ref(),
+                    builder.metadata(),
+                    reorder_predicates,
+                ) {
+                    builder = builder.with_row_filter(filter);
+                }
+            };
+
             let groups = builder.metadata().row_groups();
             let row_groups =
                 prune_row_groups(groups, file_range, pruning_predicate, 
&metrics);
@@ -839,6 +893,7 @@ mod tests {
         projection: Option<Vec<usize>>,
         schema: Option<SchemaRef>,
         predicate: Option<Expr>,
+        pushdown_predicate: bool,
     ) -> Result<Vec<RecordBatch>> {
         let file_schema = match schema {
             Some(schema) => schema,
@@ -851,7 +906,7 @@ mod tests {
         let file_groups = meta.into_iter().map(Into::into).collect();
 
         // prepare the scan
-        let parquet_exec = ParquetExec::new(
+        let mut parquet_exec = ParquetExec::new(
             FileScanConfig {
                 object_store_url: ObjectStoreUrl::local_filesystem(),
                 file_groups: vec![file_groups],
@@ -865,6 +920,14 @@ mod tests {
             None,
         );
 
+        if pushdown_predicate {
+            parquet_exec = parquet_exec.with_scan_options(
+                ParquetScanOptions::default()
+                    .with_pushdown_filters(true)
+                    .with_reorder_predicates(true),
+            );
+        }
+
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         collect(Arc::new(parquet_exec), task_ctx).await
@@ -912,9 +975,10 @@ mod tests {
         let batch3 = add_to_batch(&batch1, "c3", c3);
 
         // read/write them files:
-        let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, 
None, None)
-            .await
-            .unwrap();
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, 
None, false)
+                .await
+                .unwrap();
         let expected = vec![
             "+-----+----+----+",
             "| c1  | c2 | c3 |",
@@ -953,7 +1017,7 @@ mod tests {
         let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
 
         // read/write them files:
-        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, 
None)
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, 
None, false)
             .await
             .unwrap();
         let expected = vec![
@@ -987,7 +1051,7 @@ mod tests {
         let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
 
         // read/write them files:
-        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, 
None)
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, 
None, false)
             .await
             .unwrap();
         let expected = vec![
@@ -1020,24 +1084,60 @@ mod tests {
         // batch2: c3(int8), c2(int64)
         let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
 
-        let filter = col("c2").eq(lit(0_i64));
+        let filter = col("c2").eq(lit(2_i64));
 
         // read/write them files:
-        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter))
-            .await
-            .unwrap();
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter), false)
+                .await
+                .unwrap();
         let expected = vec![
             "+-----+----+----+",
             "| c1  | c3 | c2 |",
             "+-----+----+----+",
-            "| Foo | 10 |    |",
+            "|     |    |    |",
+            "|     | 10 | 1  |",
             "|     | 20 |    |",
+            "|     | 20 | 2  |",
+            "| Foo | 10 |    |",
             "| bar |    |    |",
             "+-----+----+----+",
         ];
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter_with_filter_pushdown() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        let filter = col("c2").eq(lit(2_i64));
+
+        // read/write them files:
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter), true)
+                .await
+                .unwrap();
+        let expected = vec![
+            "+----+----+----+",
+            "| c1 | c3 | c2 |",
+            "+----+----+----+",
+            "|    | 20 | 2  |",
+            "+----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
     #[tokio::test]
     async fn evolved_schema_projection() {
         let c1: ArrayRef =
@@ -1061,10 +1161,15 @@ mod tests {
         let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), 
("c4", c4)]);
 
         // read/write them files:
-        let read =
-            round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), 
None, None)
-                .await
-                .unwrap();
+        let read = round_trip_to_parquet(
+            vec![batch1, batch2],
+            Some(vec![0, 3]),
+            None,
+            None,
+            false,
+        )
+        .await
+        .unwrap();
         let expected = vec![
             "+-----+-----+",
             "| c1  | c4  |",
@@ -1102,9 +1207,10 @@ mod tests {
         let filter = col("c3").eq(lit(0_i8));
 
         // read/write them files:
-        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter))
-            .await
-            .unwrap();
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter), false)
+                .await
+                .unwrap();
 
         // Predicate should prune all row groups
         assert_eq!(read.len(), 0);
@@ -1123,12 +1229,13 @@ mod tests {
         // batch2: c2(int64)
         let batch2 = create_batch(vec![("c2", c2)]);
 
-        let filter = col("c2").eq(lit(0_i64));
+        let filter = col("c2").eq(lit(1_i64));
 
         // read/write them files:
-        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter))
-            .await
-            .unwrap();
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter), false)
+                .await
+                .unwrap();
 
         // This does not look correct since the "c2" values in the result do 
not in fact match the predicate `c2 == 0`
         // but parquet pruning is not exact. If the min/max values are not 
defined (which they are not in this case since the it is
@@ -1139,14 +1246,48 @@ mod tests {
             "+-----+----+",
             "| c1  | c2 |",
             "+-----+----+",
-            "| Foo |    |",
             "|     |    |",
+            "|     |    |",
+            "|     | 1  |",
+            "|     | 2  |",
+            "| Foo |    |",
             "| bar |    |",
             "+-----+----+",
         ];
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        // batch1: c1(string)
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        // batch2: c2(int64)
+        let batch2 = create_batch(vec![("c2", c2)]);
+
+        let filter = col("c2").eq(lit(1_i64));
+
+        // read/write them files:
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], None, None, 
Some(filter), true)
+                .await
+                .unwrap();
+
+        let expected = vec![
+            "+----+----+",
+            "| c1 | c2 |",
+            "+----+----+",
+            "|    | 1  |",
+            "+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
     #[tokio::test]
     async fn evolved_schema_incompatible_types() {
         let c1: ArrayRef =
@@ -1181,6 +1322,7 @@ mod tests {
             None,
             Some(Arc::new(schema)),
             None,
+            false,
         )
         .await;
         assert_contains!(read.unwrap_err().to_string(),
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs 
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
new file mode 100644
index 000000000..56bdba557
--- /dev/null
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -0,0 +1,400 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, BooleanArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Column, Result, ScalarValue, ToDFSchema};
+use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, 
RewriteRecursion};
+
+use datafusion_expr::{uncombine_filter, Expr};
+use datafusion_physical_expr::execution_props::ExecutionProps;
+use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
+use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use parquet::arrow::ProjectionMask;
+use parquet::file::metadata::ParquetMetaData;
+use std::sync::Arc;
+
+/// This module contains utilities for enabling the pushdown of DataFusion 
filter predicates (which
+/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the 
parquet decoder level in `arrow-rs`.
+/// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet 
into arrow `RecordBatch`es.
+/// When constructing the  `ParquetRecordBatchStream` you can provide a 
`RowFilter` which is itself just a vector
+/// of `Box<dyn ArrowPredicate>`. During decoding, the predicates are 
evaluated to generate a mask which is used
+/// to avoid decoding rows in projected columns which are not selected which 
can significantly reduce the amount
+/// of compute required for decoding.
+///
+/// Since the predicates are applied serially in the order defined in the 
`RowFilter`, the optimal ordering
+/// will depend on the exact filters. The best filters to execute first have 
two properties:
+///     1. The are relatively inexpensive to evaluate (e.g. they read column 
chunks which are relatively small)
+///     2. They filter a lot of rows, reducing the amount of decoding required 
for subsequent filters and projected columns
+///
+/// Given the metadata exposed by parquet, the selectivity of filters is not 
easy to estimate so the heuristics we use here primarily
+/// focus on the evaluation cost.
+///
+/// The basic algorithm for constructing the `RowFilter` is as follows
+///     1. Recursively break conjunctions into separate predicates. An 
expression like `a = 1 AND (b = 2 AND c = 3)` would be
+///        separated into the expressions `a = 1`, `b = 2`, and `c = 3`.
+///     2. Determine whether each predicate is suitable as an 
`ArrowPredicate`. As long as the predicate does not reference any projected 
columns
+///        or columns with non-primitive types, then it is considered suitable.
+///     3. Determine, for each predicate, the total compressed size of all 
columns required to evaluate the predicate.
+///     4. Determine, for each predicate, whether all columns required to 
evaluate the expression are sorted.
+///     5. Re-order the predicate by total size (from step 3).
+///     6. Partition the predicates according to whether they are sorted (from 
step 4)
+///     7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
+///     8. Build the `RowFilter` with the sorted predicates followed by the 
unsorted predicates. Within each partition
+///        the predicates will still be sorted by size.
+
+/// A predicate which can be passed to `ParquetRecordBatchStream` to perform 
row-level
+/// filtering during parquet decoding.
+#[derive(Debug)]
+pub(crate) struct DatafusionArrowPredicate {
+    physical_expr: Arc<dyn PhysicalExpr>,
+    projection: ProjectionMask,
+}
+
+impl DatafusionArrowPredicate {
+    pub fn try_new(
+        candidate: FilterCandidate,
+        schema: &Schema,
+        metadata: &ParquetMetaData,
+    ) -> Result<Self> {
+        let props = ExecutionProps::default();
+
+        let schema = schema.project(&candidate.projection)?;
+        let df_schema = schema.clone().to_dfschema()?;
+
+        let physical_expr =
+            create_physical_expr(&candidate.expr, &df_schema, &schema, 
&props)?;
+
+        Ok(Self {
+            physical_expr,
+            projection: ProjectionMask::roots(
+                metadata.file_metadata().schema_descr(),
+                candidate.projection,
+            ),
+        })
+    }
+}
+
+impl ArrowPredicate for DatafusionArrowPredicate {
+    fn projection(&self) -> &ProjectionMask {
+        &self.projection
+    }
+
+    fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+        match self
+            .physical_expr
+            .evaluate(&batch)
+            .map(|v| v.into_array(batch.num_rows()))
+        {
+            Ok(array) => {
+                if let Some(mask) = 
array.as_any().downcast_ref::<BooleanArray>() {
+                    Ok(BooleanArray::from(mask.data().clone()))
+                } else {
+                    Err(ArrowError::ComputeError(
+                        "Unexpected result of predicate evaluation, expected 
BooleanArray".to_owned(),
+                    ))
+                }
+            }
+            Err(e) => Err(ArrowError::ComputeError(format!(
+                "Error evaluating filter predicate: {:?}",
+                e
+            ))),
+        }
+    }
+}
+
+/// A candidate expression for creating a `RowFilter` contains the
+/// expression as well as data to estimate the cost of evaluating
+/// the resulting expression.
+pub(crate) struct FilterCandidate {
+    expr: Expr,
+    required_bytes: usize,
+    can_use_index: bool,
+    projection: Vec<usize>,
+}
+
+/// Helper to build a `FilterCandidate`. This will do several things
+/// 1. Determine the columns required to evaluate the expression
+/// 2. Calculate data required to estimate the cost of evaluating the filter
+/// 3. Rewrite column expressions in the predicate which reference columns not 
in the particular file schema.
+///    This is relevant in the case where we have determined the table schema 
by merging all individual file schemas
+///    and any given file may or may not contain all columns in the merged 
schema. If a particular column is not present
+///    we replace the column expression with a literal expression that 
produces a null value.
+struct FilterCandidateBuilder<'a> {
+    expr: Expr,
+    file_schema: &'a Schema,
+    table_schema: &'a Schema,
+    required_column_indices: Vec<usize>,
+    non_primitive_columns: bool,
+    projected_columns: bool,
+}
+
+impl<'a> FilterCandidateBuilder<'a> {
+    pub fn new(expr: Expr, file_schema: &'a Schema, table_schema: &'a Schema) 
-> Self {
+        Self {
+            expr,
+            file_schema,
+            table_schema,
+            required_column_indices: vec![],
+            non_primitive_columns: false,
+            projected_columns: false,
+        }
+    }
+
+    pub fn build(
+        mut self,
+        metadata: &ParquetMetaData,
+    ) -> Result<Option<FilterCandidate>> {
+        let expr = self.expr.clone();
+        let expr = expr.rewrite(&mut self)?;
+
+        if self.non_primitive_columns || self.projected_columns {
+            Ok(None)
+        } else {
+            let required_bytes =
+                size_of_columns(&self.required_column_indices, metadata)?;
+            let can_use_index = columns_sorted(&self.required_column_indices, 
metadata)?;
+
+            Ok(Some(FilterCandidate {
+                expr,
+                required_bytes,
+                can_use_index,
+                projection: self.required_column_indices,
+            }))
+        }
+    }
+}
+
+impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
+    fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
+        if let Expr::Column(column) = expr {
+            if let Ok(idx) = self.file_schema.index_of(&column.name) {
+                self.required_column_indices.push(idx);
+
+                if !is_primitive_field(self.file_schema.field(idx)) {
+                    self.non_primitive_columns = true;
+                }
+            } else if self.table_schema.index_of(&column.name).is_err() {
+                // If the column does not exist in the (un-projected) table 
schema then
+                // it must be a projected column.
+                self.projected_columns = true;
+            }
+        }
+        Ok(RewriteRecursion::Continue)
+    }
+
+    fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+        if let Expr::Column(Column { name, .. }) = &expr {
+            if self.file_schema.field_with_name(name).is_err() {
+                return Ok(Expr::Literal(ScalarValue::Null));
+            }
+        }
+
+        Ok(expr)
+    }
+}
+
+/// Calculate the total compressed size of all `Column's required for
+/// predicate `Expr`. This should represent the total amount of file IO
+/// required to evaluate the predicate.
+fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> 
Result<usize> {
+    let mut total_size = 0;
+    let row_groups = metadata.row_groups();
+    for idx in columns {
+        for rg in row_groups.iter() {
+            total_size += rg.column(*idx).compressed_size() as usize;
+        }
+    }
+
+    Ok(total_size)
+}
+
+/// For a given set of `Column`s required for predicate `Expr` determine 
whether all
+/// columns are sorted. Sorted columns may be queried more efficiently in the 
presence of
+/// a PageIndex.
+fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> 
Result<bool> {
+    // TODO How do we know this?
+    Ok(false)
+}
+
+/// Build a [`RowFilter`] from the given predicate `Expr`
+pub fn build_row_filter(
+    expr: Expr,
+    file_schema: &Schema,
+    table_schema: &Schema,
+    metadata: &ParquetMetaData,
+    reorder_predicates: bool,
+) -> Result<Option<RowFilter>> {
+    let predicates = uncombine_filter(expr);
+
+    let mut candidates: Vec<FilterCandidate> = predicates
+        .into_iter()
+        .flat_map(|expr| {
+            if let Ok(candidate) =
+                FilterCandidateBuilder::new(expr, file_schema, table_schema)
+                    .build(metadata)
+            {
+                candidate
+            } else {
+                None
+            }
+        })
+        .collect();
+
+    if candidates.is_empty() {
+        Ok(None)
+    } else if reorder_predicates {
+        candidates.sort_by_key(|c| c.required_bytes);
+
+        let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) =
+            candidates.into_iter().partition(|c| c.can_use_index);
+
+        let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
+
+        for candidate in indexed_candidates {
+            let filter =
+                DatafusionArrowPredicate::try_new(candidate, file_schema, 
metadata)?;
+
+            filters.push(Box::new(filter));
+        }
+
+        for candidate in other_candidates {
+            let filter =
+                DatafusionArrowPredicate::try_new(candidate, file_schema, 
metadata)?;
+
+            filters.push(Box::new(filter));
+        }
+
+        Ok(Some(RowFilter::new(filters)))
+    } else {
+        let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
+        for candidate in candidates {
+            let filter =
+                DatafusionArrowPredicate::try_new(candidate, file_schema, 
metadata)?;
+
+            filters.push(Box::new(filter));
+        }
+
+        Ok(Some(RowFilter::new(filters)))
+    }
+}
+
+/// return true if this is a non nested type.
+// TODO remove after https://github.com/apache/arrow-rs/issues/2704 is done
+fn is_primitive_field(field: &Field) -> bool {
+    !matches!(
+        field.data_type(),
+        DataType::List(_)
+            | DataType::FixedSizeList(_, _)
+            | DataType::LargeList(_)
+            | DataType::Struct(_)
+            | DataType::Union(_, _, _)
+            | DataType::Map(_, _)
+    )
+}
+
+#[cfg(test)]
+mod test {
+    use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_common::ScalarValue;
+    use datafusion_expr::{col, lit};
+    use parquet::arrow::parquet_to_arrow_schema;
+    use parquet::file::reader::{FileReader, SerializedFileReader};
+
+    // Assume a column expression for a column not in the table schema is a 
projected column and ignore it
+    #[test]
+    fn test_filter_candidate_builder_ignore_projected_columns() {
+        let testdata = crate::test_util::parquet_test_data();
+        let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet", 
testdata))
+            .expect("opening file");
+
+        let reader = SerializedFileReader::new(file).expect("creating reader");
+
+        let metadata = reader.metadata();
+
+        let table_schema =
+            parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), 
None)
+                .expect("parsing schema");
+
+        let expr = col("projected_column").eq(lit("value"));
+
+        let candidate = FilterCandidateBuilder::new(expr, &table_schema, 
&table_schema)
+            .build(metadata)
+            .expect("building candidate");
+
+        assert!(candidate.is_none());
+    }
+
+    // We should ignore predicate that read non-primitive columns
+    #[test]
+    fn test_filter_candidate_builder_ignore_complex_types() {
+        let testdata = crate::test_util::parquet_test_data();
+        let file = std::fs::File::open(&format!("{}/list_columns.parquet", 
testdata))
+            .expect("opening file");
+
+        let reader = SerializedFileReader::new(file).expect("creating reader");
+
+        let metadata = reader.metadata();
+
+        let table_schema =
+            parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), 
None)
+                .expect("parsing schema");
+
+        let expr = col("int64_list").is_not_null();
+
+        let candidate = FilterCandidateBuilder::new(expr, &table_schema, 
&table_schema)
+            .build(metadata)
+            .expect("building candidate");
+
+        assert!(candidate.is_none());
+    }
+
+    // If a column exists in the table schema but not the file schema it 
should be rewritten to a null expression
+    #[test]
+    fn test_filter_candidate_builder_rewrite_missing_column() {
+        let testdata = crate::test_util::parquet_test_data();
+        let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet", 
testdata))
+            .expect("opening file");
+
+        let reader = SerializedFileReader::new(file).expect("creating reader");
+
+        let metadata = reader.metadata();
+
+        let table_schema =
+            parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), 
None)
+                .expect("parsing schema");
+
+        let file_schema = Schema::new(vec![
+            Field::new("bigint_col", DataType::Int64, true),
+            Field::new("float_col", DataType::Float32, true),
+        ]);
+
+        let expr = col("bigint_col").eq(col("int_col"));
+        let expected_candidate_expr = 
col("bigint_col").eq(lit(ScalarValue::Null));
+
+        let candidate = FilterCandidateBuilder::new(expr, &file_schema, 
&table_schema)
+            .build(metadata)
+            .expect("building candidate");
+
+        assert!(candidate.is_some());
+
+        assert_eq!(candidate.unwrap().expr, expected_candidate_expr);
+    }
+}
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 6c5cc0ecc..8d89f6f41 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -452,6 +452,37 @@ pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
     Some(combined_filter)
 }
 
+/// Take combined filter (multiple boolean expressions ANDed together)
+/// and break down into distinct filters. This should be the inverse of
+/// `combine_filters`
+pub fn uncombine_filter(filter: Expr) -> Vec<Expr> {
+    match filter {
+        Expr::BinaryExpr {
+            left,
+            op: Operator::And,
+            right,
+        } => {
+            let mut exprs = uncombine_filter(*left);
+            exprs.extend(uncombine_filter(*right));
+            exprs
+        }
+        expr => {
+            vec![expr]
+        }
+    }
+}
+
+/// Combines an array of filter expressions into a single filter expression
+/// consisting of the input filter expressions joined with logical OR.
+/// Returns None if the filters array is empty.
+pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option<Expr> {
+    if filters.is_empty() {
+        return None;
+    }
+
+    filters.iter().cloned().reduce(or)
+}
+
 /// Recursively un-alias an expressions
 #[inline]
 pub fn unalias(expr: Expr) -> Expr {
@@ -521,6 +552,7 @@ pub fn call_fn(name: impl AsRef<str>, args: Vec<Expr>) -> 
Result<Expr> {
 #[cfg(test)]
 mod test {
     use super::*;
+    use arrow::datatypes::{Field, Schema};
 
     #[test]
     fn filter_is_null_and_is_not_null() {
@@ -698,4 +730,56 @@ mod test {
             combine_filters(&[filter1.clone(), filter2.clone(), 
filter3.clone()]);
         assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
     }
+
+    fn assert_predicates(actual: Vec<Expr>, expected: Vec<Expr>) {
+        assert_eq!(
+            actual.len(),
+            expected.len(),
+            "Predicates are not equal, found {} predicates but expected {}",
+            actual.len(),
+            expected.len()
+        );
+
+        for expr in expected.into_iter() {
+            assert!(
+                actual.contains(&expr),
+                "Predicates are not equal, predicate {:?} not found in {:?}",
+                expr,
+                actual
+            );
+        }
+    }
+
+    #[test]
+    fn test_uncombine_filter() {
+        let _schema = Schema::new(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Utf8, true),
+            Field::new("c", DataType::Utf8, true),
+        ]);
+
+        let expr = col("a").eq(lit("s"));
+        let actual = uncombine_filter(expr);
+
+        assert_predicates(actual, vec![col("a").eq(lit("s"))]);
+    }
+
+    #[test]
+    fn test_uncombine_filter_recursively() {
+        let _schema = Schema::new(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Utf8, true),
+            Field::new("c", DataType::Utf8, true),
+        ]);
+
+        let expr = and(col("a"), col("b"));
+        let actual = uncombine_filter(expr);
+
+        assert_predicates(actual, vec![col("a"), col("b")]);
+
+        let expr = col("a").and(col("b")).or(col("c"));
+        let actual = uncombine_filter(expr.clone());
+
+        assert_predicates(actual, vec![expr]);
+    }
 }

Reply via email to