This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 70256baefa feat: Update Parquet row filtering to handle type coercion
(#10716)
70256baefa is described below
commit 70256baefa7640a94d45749981e6509cafc3109b
Author: Jeffrey Smith II <[email protected]>
AuthorDate: Wed Jun 5 12:17:11 2024 -0400
feat: Update Parquet row filtering to handle type coercion (#10716)
* test: Add a failing test to show the lack of type coercion in row filters
* feat: update parquet row filter to handle type coercion
* chore: lint/fmt
* chore: test improvements and cleanup
---
.../src/datasource/physical_plan/parquet/opener.rs | 1 +
.../datasource/physical_plan/parquet/row_filter.rs | 120 +++++++++++++++++++--
datafusion/core/src/datasource/schema_adapter.rs | 46 +++++++-
3 files changed, 156 insertions(+), 11 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 3aec1e1d20..5fb21975df 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -114,6 +114,7 @@ impl FileOpener for ParquetOpener {
builder.metadata(),
reorder_predicates,
&file_metrics,
+ Arc::clone(&schema_mapping),
);
match row_filter {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 5f89ff087f..18c6c51d28 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -18,13 +18,15 @@
use std::collections::BTreeSet;
use std::sync::Arc;
-use super::ParquetFileMetrics;
-use crate::physical_plan::metrics;
-
use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
+use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use parquet::arrow::ProjectionMask;
+use parquet::file::metadata::ParquetMetaData;
+
+use crate::datasource::schema_adapter::SchemaMapper;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
TreeNodeRewriter,
@@ -34,9 +36,9 @@ use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_predicate_columns;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
-use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
-use parquet::arrow::ProjectionMask;
-use parquet::file::metadata::ParquetMetaData;
+use crate::physical_plan::metrics;
+
+use super::ParquetFileMetrics;
/// This module contains utilities for enabling the pushdown of DataFusion
filter predicates (which
/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the
parquet decoder level in `arrow-rs`.
@@ -78,6 +80,8 @@ pub(crate) struct DatafusionArrowPredicate {
rows_filtered: metrics::Count,
/// how long was spent evaluating this predicate
time: metrics::Time,
+ /// used to perform type coercion while filtering rows
+ schema_mapping: Arc<dyn SchemaMapper>,
}
impl DatafusionArrowPredicate {
@@ -87,6 +91,7 @@ impl DatafusionArrowPredicate {
metadata: &ParquetMetaData,
rows_filtered: metrics::Count,
time: metrics::Time,
+ schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Self> {
let schema = Arc::new(schema.project(&candidate.projection)?);
let physical_expr = reassign_predicate_columns(candidate.expr,
&schema, true)?;
@@ -108,6 +113,7 @@ impl DatafusionArrowPredicate {
),
rows_filtered,
time,
+ schema_mapping,
})
}
}
@@ -123,6 +129,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
false => batch.project(&self.projection)?,
};
+ let batch = self.schema_mapping.map_partial_batch(batch)?;
+
// scoped timer updates on drop
let mut timer = self.time.timer();
match self
@@ -323,6 +331,7 @@ pub fn build_row_filter(
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
+ schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Option<RowFilter>> {
let rows_filtered = &file_metrics.pushdown_rows_filtered;
let time = &file_metrics.pushdown_eval_time;
@@ -360,6 +369,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
+ Arc::clone(&schema_mapping),
)?;
filters.push(Box::new(filter));
@@ -372,6 +382,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
+ Arc::clone(&schema_mapping),
)?;
filters.push(Box::new(filter));
@@ -387,6 +398,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
+ Arc::clone(&schema_mapping),
)?;
filters.push(Box::new(filter));
@@ -398,15 +410,23 @@ pub fn build_row_filter(
#[cfg(test)]
mod test {
- use super::*;
use arrow::datatypes::Field;
+ use arrow_schema::TimeUnit::Nanosecond;
+ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+ use parquet::arrow::parquet_to_arrow_schema;
+ use parquet::file::reader::{FileReader, SerializedFileReader};
+ use rand::prelude::*;
+
+ use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
+ use crate::datasource::schema_adapter::SchemaAdapterFactory;
+
use datafusion_common::ToDFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{cast, col, lit, Expr};
use datafusion_physical_expr::create_physical_expr;
- use parquet::arrow::parquet_to_arrow_schema;
- use parquet::file::reader::{FileReader, SerializedFileReader};
- use rand::prelude::*;
+ use datafusion_physical_plan::metrics::{Count, Time};
+
+ use super::*;
// We should ignore predicate that read non-primitive columns
#[test]
@@ -473,6 +493,86 @@ mod test {
);
}
+ #[test]
+ fn test_filter_type_coercion() {
+ let testdata = crate::test_util::parquet_test_data();
+ let file =
std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
+ .expect("opening file");
+
+ let parquet_reader_builder =
+ ParquetRecordBatchReaderBuilder::try_new(file).expect("creating
reader");
+ let metadata = parquet_reader_builder.metadata().clone();
+ let file_schema = parquet_reader_builder.schema().clone();
+
+ // This is the schema we would like to coerce to,
+ // which is different from the physical schema of the file.
+ let table_schema = Schema::new(vec![Field::new(
+ "timestamp_col",
+ DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
+ false,
+ )]);
+
+ let schema_adapter =
+ DefaultSchemaAdapterFactory
{}.create(Arc::new(table_schema.clone()));
+ let (schema_mapping, _) = schema_adapter
+ .map_schema(&file_schema)
+ .expect("creating schema mapping");
+
+ let mut parquet_reader =
parquet_reader_builder.build().expect("building reader");
+
+ // Parquet file is small, we only need 1 recordbatch
+ let first_rb = parquet_reader
+ .next()
+ .expect("expected record batch")
+ .expect("expected error free record batch");
+
+ // Test all should fail
+ let expr = col("timestamp_col").lt(Expr::Literal(
+ ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
+ ));
+ let expr = logical2physical(&expr, &table_schema);
+ let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("candidate expected");
+
+ let mut row_filter = DatafusionArrowPredicate::try_new(
+ candidate,
+ &file_schema,
+ &metadata,
+ Count::new(),
+ Time::new(),
+ Arc::clone(&schema_mapping),
+ )
+ .expect("creating filter predicate");
+
+ let filtered = row_filter.evaluate(first_rb.clone());
+ assert!(matches!(filtered, Ok(a) if a ==
BooleanArray::from(vec![false; 8])));
+
+ // Test all should pass
+ let expr = col("timestamp_col").gt(Expr::Literal(
+ ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
+ ));
+ let expr = logical2physical(&expr, &table_schema);
+ let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("candidate expected");
+
+ let mut row_filter = DatafusionArrowPredicate::try_new(
+ candidate,
+ &file_schema,
+ &metadata,
+ Count::new(),
+ Time::new(),
+ schema_mapping,
+ )
+ .expect("creating filter predicate");
+
+ let filtered = row_filter.evaluate(first_rb);
+ assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true;
8])));
+ }
+
#[test]
fn test_remap_projection() {
let mut rng = thread_rng();
diff --git a/datafusion/core/src/datasource/schema_adapter.rs
b/datafusion/core/src/datasource/schema_adapter.rs
index 77fde608fd..e8b64e9090 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -75,9 +75,21 @@ pub trait SchemaAdapter: Send + Sync {
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
-pub trait SchemaMapper: Send + Sync {
+pub trait SchemaMapper: Debug + Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch>;
+
+ /// Adapts a [`RecordBatch`] that does not have all the columns from the
+ /// file schema.
+ ///
+ /// This method is used when applying a filter to a subset of the columns
during
+ /// an `ArrowPredicate`.
+ ///
+ /// This method is slower than `map_batch` as it looks up columns by name.
+ fn map_partial_batch(
+ &self,
+ batch: RecordBatch,
+ ) -> datafusion_common::Result<RecordBatch>;
}
#[derive(Clone, Debug, Default)]
@@ -185,6 +197,31 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
Ok(record_batch)
}
+
+ fn map_partial_batch(
+ &self,
+ batch: RecordBatch,
+ ) -> datafusion_common::Result<RecordBatch> {
+ let batch_cols = batch.columns().to_vec();
+ let schema = batch.schema();
+
+ let mut cols = vec![];
+ let mut fields = vec![];
+ for (i, f) in schema.fields().iter().enumerate() {
+ let table_field = self.table_schema.field_with_name(f.name());
+ if let Ok(tf) = table_field {
+ cols.push(cast(&batch_cols[i], tf.data_type())?);
+ fields.push(tf.clone());
+ }
+ }
+
+ // Necessary to handle empty batches
+ let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+ let schema = Arc::new(Schema::new(fields));
+ let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
+ Ok(record_batch)
+ }
}
#[cfg(test)]
@@ -337,5 +374,12 @@ mod tests {
Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}
+
+ fn map_partial_batch(
+ &self,
+ batch: RecordBatch,
+ ) -> datafusion_common::Result<RecordBatch> {
+ self.map_batch(batch)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]