This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 70256baefa feat: Update Parquet row filtering to handle type coercion 
(#10716)
70256baefa is described below

commit 70256baefa7640a94d45749981e6509cafc3109b
Author: Jeffrey Smith II <[email protected]>
AuthorDate: Wed Jun 5 12:17:11 2024 -0400

    feat: Update Parquet row filtering to handle type coercion (#10716)
    
    * test: Add a failing test to show the lack of type coercion in row filters
    
    * feat: update parquet row filter to handle type coercion
    
    * chore: lint/fmt
    
    * chore: test improvements and cleanup
---
 .../src/datasource/physical_plan/parquet/opener.rs |   1 +
 .../datasource/physical_plan/parquet/row_filter.rs | 120 +++++++++++++++++++--
 datafusion/core/src/datasource/schema_adapter.rs   |  46 +++++++-
 3 files changed, 156 insertions(+), 11 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 3aec1e1d20..5fb21975df 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -114,6 +114,7 @@ impl FileOpener for ParquetOpener {
                     builder.metadata(),
                     reorder_predicates,
                     &file_metrics,
+                    Arc::clone(&schema_mapping),
                 );
 
                 match row_filter {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 5f89ff087f..18c6c51d28 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -18,13 +18,15 @@
 use std::collections::BTreeSet;
 use std::sync::Arc;
 
-use super::ParquetFileMetrics;
-use crate::physical_plan::metrics;
-
 use arrow::array::BooleanArray;
 use arrow::datatypes::{DataType, Schema};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
+use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use parquet::arrow::ProjectionMask;
+use parquet::file::metadata::ParquetMetaData;
+
+use crate::datasource::schema_adapter::SchemaMapper;
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter,
@@ -34,9 +36,9 @@ use datafusion_physical_expr::expressions::{Column, Literal};
 use datafusion_physical_expr::utils::reassign_predicate_columns;
 use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
 
-use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
-use parquet::arrow::ProjectionMask;
-use parquet::file::metadata::ParquetMetaData;
+use crate::physical_plan::metrics;
+
+use super::ParquetFileMetrics;
 
 /// This module contains utilities for enabling the pushdown of DataFusion 
filter predicates (which
 /// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the 
parquet decoder level in `arrow-rs`.
@@ -78,6 +80,8 @@ pub(crate) struct DatafusionArrowPredicate {
     rows_filtered: metrics::Count,
     /// how long was spent evaluating this predicate
     time: metrics::Time,
+    /// used to perform type coercion while filtering rows
+    schema_mapping: Arc<dyn SchemaMapper>,
 }
 
 impl DatafusionArrowPredicate {
@@ -87,6 +91,7 @@ impl DatafusionArrowPredicate {
         metadata: &ParquetMetaData,
         rows_filtered: metrics::Count,
         time: metrics::Time,
+        schema_mapping: Arc<dyn SchemaMapper>,
     ) -> Result<Self> {
         let schema = Arc::new(schema.project(&candidate.projection)?);
         let physical_expr = reassign_predicate_columns(candidate.expr, 
&schema, true)?;
@@ -108,6 +113,7 @@ impl DatafusionArrowPredicate {
             ),
             rows_filtered,
             time,
+            schema_mapping,
         })
     }
 }
@@ -123,6 +129,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
             false => batch.project(&self.projection)?,
         };
 
+        let batch = self.schema_mapping.map_partial_batch(batch)?;
+
         // scoped timer updates on drop
         let mut timer = self.time.timer();
         match self
@@ -323,6 +331,7 @@ pub fn build_row_filter(
     metadata: &ParquetMetaData,
     reorder_predicates: bool,
     file_metrics: &ParquetFileMetrics,
+    schema_mapping: Arc<dyn SchemaMapper>,
 ) -> Result<Option<RowFilter>> {
     let rows_filtered = &file_metrics.pushdown_rows_filtered;
     let time = &file_metrics.pushdown_eval_time;
@@ -360,6 +369,7 @@ pub fn build_row_filter(
                 metadata,
                 rows_filtered.clone(),
                 time.clone(),
+                Arc::clone(&schema_mapping),
             )?;
 
             filters.push(Box::new(filter));
@@ -372,6 +382,7 @@ pub fn build_row_filter(
                 metadata,
                 rows_filtered.clone(),
                 time.clone(),
+                Arc::clone(&schema_mapping),
             )?;
 
             filters.push(Box::new(filter));
@@ -387,6 +398,7 @@ pub fn build_row_filter(
                 metadata,
                 rows_filtered.clone(),
                 time.clone(),
+                Arc::clone(&schema_mapping),
             )?;
 
             filters.push(Box::new(filter));
@@ -398,15 +410,23 @@ pub fn build_row_filter(
 
 #[cfg(test)]
 mod test {
-    use super::*;
     use arrow::datatypes::Field;
+    use arrow_schema::TimeUnit::Nanosecond;
+    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+    use parquet::arrow::parquet_to_arrow_schema;
+    use parquet::file::reader::{FileReader, SerializedFileReader};
+    use rand::prelude::*;
+
+    use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
+    use crate::datasource::schema_adapter::SchemaAdapterFactory;
+
     use datafusion_common::ToDFSchema;
     use datafusion_expr::execution_props::ExecutionProps;
     use datafusion_expr::{cast, col, lit, Expr};
     use datafusion_physical_expr::create_physical_expr;
-    use parquet::arrow::parquet_to_arrow_schema;
-    use parquet::file::reader::{FileReader, SerializedFileReader};
-    use rand::prelude::*;
+    use datafusion_physical_plan::metrics::{Count, Time};
+
+    use super::*;
 
     // We should ignore predicate that read non-primitive columns
     #[test]
@@ -473,6 +493,86 @@ mod test {
         );
     }
 
+    #[test]
+    fn test_filter_type_coercion() {
+        let testdata = crate::test_util::parquet_test_data();
+        let file = 
std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
+            .expect("opening file");
+
+        let parquet_reader_builder =
+            ParquetRecordBatchReaderBuilder::try_new(file).expect("creating 
reader");
+        let metadata = parquet_reader_builder.metadata().clone();
+        let file_schema = parquet_reader_builder.schema().clone();
+
+        // This is the schema we would like to coerce to,
+        // which is different from the physical schema of the file.
+        let table_schema = Schema::new(vec![Field::new(
+            "timestamp_col",
+            DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
+            false,
+        )]);
+
+        let schema_adapter =
+            DefaultSchemaAdapterFactory 
{}.create(Arc::new(table_schema.clone()));
+        let (schema_mapping, _) = schema_adapter
+            .map_schema(&file_schema)
+            .expect("creating schema mapping");
+
+        let mut parquet_reader = 
parquet_reader_builder.build().expect("building reader");
+
+        // Parquet file is small, we only need 1 recordbatch
+        let first_rb = parquet_reader
+            .next()
+            .expect("expected record batch")
+            .expect("expected error free record batch");
+
+        // Test all should fail
+        let expr = col("timestamp_col").lt(Expr::Literal(
+            ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
+        ));
+        let expr = logical2physical(&expr, &table_schema);
+        let candidate = FilterCandidateBuilder::new(expr, &file_schema, 
&table_schema)
+            .build(&metadata)
+            .expect("building candidate")
+            .expect("candidate expected");
+
+        let mut row_filter = DatafusionArrowPredicate::try_new(
+            candidate,
+            &file_schema,
+            &metadata,
+            Count::new(),
+            Time::new(),
+            Arc::clone(&schema_mapping),
+        )
+        .expect("creating filter predicate");
+
+        let filtered = row_filter.evaluate(first_rb.clone());
+        assert!(matches!(filtered, Ok(a) if a == 
BooleanArray::from(vec![false; 8])));
+
+        // Test all should pass
+        let expr = col("timestamp_col").gt(Expr::Literal(
+            ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
+        ));
+        let expr = logical2physical(&expr, &table_schema);
+        let candidate = FilterCandidateBuilder::new(expr, &file_schema, 
&table_schema)
+            .build(&metadata)
+            .expect("building candidate")
+            .expect("candidate expected");
+
+        let mut row_filter = DatafusionArrowPredicate::try_new(
+            candidate,
+            &file_schema,
+            &metadata,
+            Count::new(),
+            Time::new(),
+            schema_mapping,
+        )
+        .expect("creating filter predicate");
+
+        let filtered = row_filter.evaluate(first_rb);
+        assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 
8])));
+    }
+
     #[test]
     fn test_remap_projection() {
         let mut rng = thread_rng();
diff --git a/datafusion/core/src/datasource/schema_adapter.rs 
b/datafusion/core/src/datasource/schema_adapter.rs
index 77fde608fd..e8b64e9090 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -75,9 +75,21 @@ pub trait SchemaAdapter: Send + Sync {
 
 /// Creates a `SchemaMapping` that can be used to cast or map the columns
 /// from the file schema to the table schema.
-pub trait SchemaMapper: Send + Sync {
+pub trait SchemaMapper: Debug + Send + Sync {
     /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
     fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch>;
+
+    /// Adapts a [`RecordBatch`] that does not  have all the columns from the
+    /// file schema.
+    ///
+    /// This method is used when applying a filter to a subset of the columns 
during
+    /// an `ArrowPredicate`.
+    ///
+    /// This method is slower than `map_batch` as it looks up columns by name.
+    fn map_partial_batch(
+        &self,
+        batch: RecordBatch,
+    ) -> datafusion_common::Result<RecordBatch>;
 }
 
 #[derive(Clone, Debug, Default)]
@@ -185,6 +197,31 @@ impl SchemaMapper for SchemaMapping {
         let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
         Ok(record_batch)
     }
+
+    fn map_partial_batch(
+        &self,
+        batch: RecordBatch,
+    ) -> datafusion_common::Result<RecordBatch> {
+        let batch_cols = batch.columns().to_vec();
+        let schema = batch.schema();
+
+        let mut cols = vec![];
+        let mut fields = vec![];
+        for (i, f) in schema.fields().iter().enumerate() {
+            let table_field = self.table_schema.field_with_name(f.name());
+            if let Ok(tf) = table_field {
+                cols.push(cast(&batch_cols[i], tf.data_type())?);
+                fields.push(tf.clone());
+            }
+        }
+
+        // Necessary to handle empty batches
+        let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+        let schema = Arc::new(Schema::new(fields));
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
+        Ok(record_batch)
+    }
 }
 
 #[cfg(test)]
@@ -337,5 +374,12 @@ mod tests {
 
             Ok(RecordBatch::try_new(schema, new_columns).unwrap())
         }
+
+        fn map_partial_batch(
+            &self,
+            batch: RecordBatch,
+        ) -> datafusion_common::Result<RecordBatch> {
+            self.map_batch(batch)
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to