This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8e2bfa4747 Fix predicate pushdown for custom SchemaAdapters (#15263)
8e2bfa4747 is described below
commit 8e2bfa474743e83ad31070410328f6b39d213361
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed Mar 19 14:27:33 2025 -0500
Fix predicate pushdown for custom SchemaAdapters (#15263)
* wip
* wip
* wip
* add tests
* wip
* wip
* fix
* fix
* fix
* better test
* more reverts
* fix
* Reduce Schema clones in predicate
* add more tests
* add another test
* Fix datafusion testing pin
* fix clippy
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-testing | 1 -
datafusion/core/src/datasource/mod.rs | 7 -
.../core/src/datasource/physical_plan/parquet.rs | 321 ++++++++++++++++
datafusion/datasource-parquet/src/opener.rs | 3 +-
datafusion/datasource-parquet/src/row_filter.rs | 416 ++++++++-------------
datafusion/datasource/src/schema_adapter.rs | 98 +----
6 files changed, 476 insertions(+), 370 deletions(-)
diff --git a/datafusion-testing b/datafusion-testing
deleted file mode 160000
index 3462eaa787..0000000000
--- a/datafusion-testing
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 3462eaa787459957e38df267a4a21f5bea605807
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index 80783b4892..3602a603cd 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -262,12 +262,5 @@ mod tests {
Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}
-
- fn map_partial_batch(
- &self,
- batch: RecordBatch,
- ) -> datafusion_common::Result<RecordBatch> {
- self.map_batch(batch)
- }
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 888f3ad9e3..b5534d6b3d 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -224,6 +224,327 @@ mod tests {
)
}
+ #[tokio::test]
+ async fn test_pushdown_with_missing_column_in_file() {
+ let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+
+ let file_schema =
+ Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32,
true)]));
+
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c2", DataType::Int32, true),
+ ]));
+
+ let batch = RecordBatch::try_new(file_schema.clone(),
vec![c1]).unwrap();
+
+ // Since c2 is missing from the file and we didn't supply a custom
`SchemaAdapterFactory`,
+ // the default behavior is to fill in missing columns with nulls.
+ // Thus this predicate will come back as false.
+ let filter = col("c2").eq(lit(1_i32));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let total_rows = rt
+ .batches
+ .unwrap()
+ .iter()
+ .map(|b| b.num_rows())
+ .sum::<usize>();
+ assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+ // If we excplicitly allow nulls the rest of the predicate should work
+ let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let batches = rt.batches.unwrap();
+ #[rustfmt::skip]
+ let expected = [
+ "+----+----+",
+ "| c1 | c2 |",
+ "+----+----+",
+ "| 1 | |",
+ "+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 2, "Expected all rows to be pruned");
+ }
+
+ #[tokio::test]
+ async fn test_pushdown_with_missing_column_in_file_multiple_types() {
+ let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+
+ let file_schema =
+ Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32,
true)]));
+
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c2", DataType::Utf8, true),
+ ]));
+
+ let batch = RecordBatch::try_new(file_schema.clone(),
vec![c1]).unwrap();
+
+ // Since c2 is missing from the file and we didn't supply a custom
`SchemaAdapterFactory`,
+ // the default behavior is to fill in missing columns with nulls.
+ // Thus this predicate will come back as false.
+ let filter = col("c2").eq(lit("abc"));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let total_rows = rt
+ .batches
+ .unwrap()
+ .iter()
+ .map(|b| b.num_rows())
+ .sum::<usize>();
+ assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+ // If we excplicitly allow nulls the rest of the predicate should work
+ let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let batches = rt.batches.unwrap();
+ #[rustfmt::skip]
+ let expected = [
+ "+----+----+",
+ "| c1 | c2 |",
+ "+----+----+",
+ "| 1 | |",
+ "+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 2, "Expected all rows to be pruned");
+ }
+
+ #[tokio::test]
+ async fn test_pushdown_with_missing_middle_column() {
+ let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
+
+ let file_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c3", DataType::Int32, true),
+ ]));
+
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c2", DataType::Utf8, true),
+ Field::new("c3", DataType::Int32, true),
+ ]));
+
+ let batch = RecordBatch::try_new(file_schema.clone(), vec![c1,
c3]).unwrap();
+
+ // Since c2 is missing from the file and we didn't supply a custom
`SchemaAdapterFactory`,
+ // the default behavior is to fill in missing columns with nulls.
+ // Thus this predicate will come back as false.
+ let filter = col("c2").eq(lit("abc"));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let total_rows = rt
+ .batches
+ .unwrap()
+ .iter()
+ .map(|b| b.num_rows())
+ .sum::<usize>();
+ assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+ // If we excplicitly allow nulls the rest of the predicate should work
+ let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let batches = rt.batches.unwrap();
+ #[rustfmt::skip]
+ let expected = [
+ "+----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+----+----+----+",
+ "| 1 | | 7 |",
+ "+----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 2, "Expected all rows to be pruned");
+ }
+
+ #[tokio::test]
+ async fn test_pushdown_with_file_column_order_mismatch() {
+ let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
+
+ let file_schema = Arc::new(Schema::new(vec![
+ Field::new("c3", DataType::Int32, true),
+ Field::new("c3", DataType::Int32, true),
+ ]));
+
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c2", DataType::Utf8, true),
+ Field::new("c3", DataType::Int32, true),
+ ]));
+
+ let batch =
+ RecordBatch::try_new(file_schema.clone(), vec![c3.clone(),
c3]).unwrap();
+
+ // Since c2 is missing from the file and we didn't supply a custom
`SchemaAdapterFactory`,
+ // the default behavior is to fill in missing columns with nulls.
+ // Thus this predicate will come back as false.
+ let filter = col("c2").eq(lit("abc"));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let total_rows = rt
+ .batches
+ .unwrap()
+ .iter()
+ .map(|b| b.num_rows())
+ .sum::<usize>();
+ assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 3, "Expected all rows to be pruned");
+
+ // If we excplicitly allow nulls the rest of the predicate should work
+ let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32)));
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+ let batches = rt.batches.unwrap();
+ #[rustfmt::skip]
+ let expected = [
+ "+----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+----+----+----+",
+ "| | | 7 |",
+ "+----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 2, "Expected all rows to be pruned");
+ }
+
+ #[tokio::test]
+ async fn test_pushdown_with_missing_column_nested_conditions() {
+ // Create test data with c1 and c3 columns
+ let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+ let c3: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40,
50]));
+
+ let file_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c3", DataType::Int32, true),
+ ]));
+
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c2", DataType::Int32, true),
+ Field::new("c3", DataType::Int32, true),
+ ]));
+
+ let batch = RecordBatch::try_new(file_schema.clone(), vec![c1,
c3]).unwrap();
+
+ // Test with complex nested AND/OR:
+ // (c1 = 1 OR c2 = 5) AND (c3 = 10 OR c2 IS NULL)
+ // Should return 1 row where c1=1 AND c3=10 (since c2 IS NULL is
always true)
+ let filter = col("c1")
+ .eq(lit(1_i32))
+ .or(col("c2").eq(lit(5_i32)))
+ .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null()));
+
+ let rt = RoundTrip::new()
+ .with_schema(table_schema.clone())
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch.clone()])
+ .await;
+
+ let batches = rt.batches.unwrap();
+ #[rustfmt::skip]
+ let expected = [
+ "+----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+----+----+----+",
+ "| 1 | | 10 |",
+ "+----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 4, "Expected 4 rows to be pruned");
+
+ // Test a more complex nested condition:
+ // (c1 < 3 AND c2 IS NOT NULL) OR (c3 > 20 AND c2 IS NULL)
+ // First part should return 0 rows (c2 IS NOT NULL is always false)
+ // Second part should return rows where c3 > 20 (3 rows: where c3 is
30, 40, 50)
+ let filter = col("c1")
+ .lt(lit(3_i32))
+ .and(col("c2").is_not_null())
+ .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null()));
+
+ let rt = RoundTrip::new()
+ .with_schema(table_schema)
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch])
+ .await;
+
+ let batches = rt.batches.unwrap();
+ #[rustfmt::skip]
+ let expected = [
+ "+----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+----+----+----+",
+ "| 3 | | 30 |",
+ "| 4 | | 40 |",
+ "| 5 | | 50 |",
+ "+----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ let metric = get_value(&metrics, "pushdown_rows_pruned");
+ assert_eq!(metric, 2, "Expected 2 rows to be pruned");
+ }
+
#[tokio::test]
async fn evolved_schema() {
let c1: ArrayRef =
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 8257a796b6..732fef47d5 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -104,6 +104,7 @@ impl FileOpener for ParquetOpener {
let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
+ let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, Arc::clone(&self.table_schema));
@@ -164,7 +165,7 @@ impl FileOpener for ParquetOpener {
builder.metadata(),
reorder_predicates,
&file_metrics,
- Arc::clone(&schema_mapping),
+ &schema_adapter_factory,
);
match row_filter {
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index 39fcecf37c..da6bf114d7 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -64,7 +64,7 @@ use std::collections::BTreeSet;
use std::sync::Arc;
use arrow::array::BooleanArray;
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
@@ -72,12 +72,10 @@ use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use datafusion_common::cast::as_boolean_array;
-use datafusion_common::tree_node::{
- Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
TreeNodeRewriter,
-};
-use datafusion_common::{arrow_datafusion_err, DataFusionError, Result,
ScalarValue};
-use datafusion_datasource::schema_adapter::SchemaMapper;
-use datafusion_physical_expr::expressions::{Column, Literal};
+use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion,
TreeNodeVisitor};
+use datafusion_common::Result;
+use datafusion_datasource::schema_adapter::{SchemaAdapterFactory,
SchemaMapper};
+use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::reassign_predicate_columns;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
@@ -102,8 +100,6 @@ pub(crate) struct DatafusionArrowPredicate {
/// Path to the columns in the parquet schema required to evaluate the
/// expression
projection_mask: ProjectionMask,
- /// Columns required to evaluate the expression in the arrow schema
- projection: Vec<usize>,
/// how many rows were filtered out by this predicate
rows_pruned: metrics::Count,
/// how many rows passed this predicate
@@ -111,34 +107,24 @@ pub(crate) struct DatafusionArrowPredicate {
/// how long was spent evaluating this predicate
time: metrics::Time,
/// used to perform type coercion while filtering rows
- schema_mapping: Arc<dyn SchemaMapper>,
+ schema_mapper: Arc<dyn SchemaMapper>,
}
impl DatafusionArrowPredicate {
/// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
pub fn try_new(
candidate: FilterCandidate,
- schema: &Schema,
metadata: &ParquetMetaData,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
- schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Self> {
- let schema = Arc::new(schema.project(&candidate.projection)?);
- let physical_expr = reassign_predicate_columns(candidate.expr,
&schema, true)?;
-
- // ArrowPredicate::evaluate is passed columns in the order they appear
in the file
- // If the predicate has multiple columns, we therefore must project
the columns based
- // on the order they appear in the file
- let projection = match candidate.projection.len() {
- 0 | 1 => vec![],
- 2.. => remap_projection(&candidate.projection),
- };
+ let projected_schema = Arc::clone(&candidate.filter_schema);
+ let physical_expr =
+ reassign_predicate_columns(candidate.expr, &projected_schema,
true)?;
Ok(Self {
physical_expr,
- projection,
projection_mask: ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
candidate.projection,
@@ -146,7 +132,7 @@ impl DatafusionArrowPredicate {
rows_pruned,
rows_matched,
time,
- schema_mapping,
+ schema_mapper: candidate.schema_mapper,
})
}
}
@@ -156,12 +142,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
&self.projection_mask
}
- fn evaluate(&mut self, mut batch: RecordBatch) ->
ArrowResult<BooleanArray> {
- if !self.projection.is_empty() {
- batch = batch.project(&self.projection)?;
- };
-
- let batch = self.schema_mapping.map_partial_batch(batch)?;
+ fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+ let batch = self.schema_mapper.map_batch(batch)?;
// scoped timer updates on drop
let mut timer = self.time.timer();
@@ -194,9 +176,22 @@ impl ArrowPredicate for DatafusionArrowPredicate {
/// See the module level documentation for more information.
pub(crate) struct FilterCandidate {
expr: Arc<dyn PhysicalExpr>,
+ /// Estimate for the total number of bytes that will need to be processed
+ /// to evaluate this filter. This is used to estimate the cost of
evaluating
+ /// the filter and to order the filters when `reorder_predicates` is true.
+ /// This is generated by summing the compressed size of all columns that
the filter references.
required_bytes: usize,
+ /// Can this filter use an index (e.g. a page index) to prune rows?
can_use_index: bool,
+ /// The projection to read from the file schema to get the columns
+ /// required to pass thorugh a `SchemaMapper` to the table schema
+ /// upon which we then evaluate the filter expression.
projection: Vec<usize>,
+ /// A `SchemaMapper` used to map batches read from the file schema to
+ /// the filter's projection of the table schema.
+ schema_mapper: Arc<dyn SchemaMapper>,
+ /// The projected table schema that this filter references
+ filter_schema: SchemaRef,
}
/// Helper to build a `FilterCandidate`.
@@ -220,41 +215,40 @@ pub(crate) struct FilterCandidate {
/// but old files do not have the columns.
///
/// When a file is missing a column from the table schema, the value of the
-/// missing column is filled in with `NULL` via a `SchemaAdapter`.
+/// missing column is filled in by a `SchemaAdapter` (by default as `NULL`).
///
/// When a predicate is pushed down to the parquet reader, the predicate is
-/// evaluated in the context of the file schema. If the predicate references a
-/// column that is in the table schema but not in the file schema, the column
-/// reference must be rewritten to a literal expression that represents the
-/// `NULL` value that would be produced by the `SchemaAdapter`.
-///
-/// For example, if:
-/// * The table schema is `id, name, address`
-/// * The file schema is `id, name` (missing the `address` column)
-/// * predicate is `address = 'foo'`
-///
-/// When evaluating the predicate as a filter on the parquet file, the
predicate
-/// must be rewritten to `NULL = 'foo'` as the `address` column will be filled
-/// in with `NULL` values during the rest of the evaluation.
-struct FilterCandidateBuilder<'a> {
+/// evaluated in the context of the file schema.
+/// For each predicate we build a filter schema which is the projection of the
table
+/// schema that contains only the columns that this filter references.
+/// If any columns from the file schema are missing from a particular file
they are
+/// added by the `SchemaAdapter`, by default as `NULL`.
+struct FilterCandidateBuilder {
expr: Arc<dyn PhysicalExpr>,
- /// The schema of this parquet file
- file_schema: &'a Schema,
+ /// The schema of this parquet file.
+ /// Columns may have different types from the table schema and there may be
+ /// columns in the file schema that are not in the table schema or columns
that
+ /// are in the table schema that are not in the file schema.
+ file_schema: SchemaRef,
/// The schema of the table (merged schema) -- columns may be in different
/// order than in the file and have columns that are not in the file schema
- table_schema: &'a Schema,
+ table_schema: SchemaRef,
+ /// A `SchemaAdapterFactory` used to map the file schema to the table
schema.
+ schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
}
-impl<'a> FilterCandidateBuilder<'a> {
+impl FilterCandidateBuilder {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
- file_schema: &'a Schema,
- table_schema: &'a Schema,
+ file_schema: Arc<Schema>,
+ table_schema: Arc<Schema>,
+ schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
Self {
expr,
file_schema,
table_schema,
+ schema_adapter_factory,
}
}
@@ -266,20 +260,32 @@ impl<'a> FilterCandidateBuilder<'a> {
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
pub fn build(self, metadata: &ParquetMetaData) ->
Result<Option<FilterCandidate>> {
- let Some((required_indices, rewritten_expr)) =
- pushdown_columns(self.expr, self.file_schema, self.table_schema)?
+ let Some(required_indices_into_table_schema) =
+ pushdown_columns(&self.expr, &self.table_schema)?
else {
return Ok(None);
};
- let required_bytes = size_of_columns(&required_indices, metadata)?;
- let can_use_index = columns_sorted(&required_indices, metadata)?;
+ let projected_table_schema = Arc::new(
+ self.table_schema
+ .project(&required_indices_into_table_schema)?,
+ );
+
+ let (schema_mapper, projection_into_file_schema) = self
+ .schema_adapter_factory
+ .create(Arc::clone(&projected_table_schema), self.table_schema)
+ .map_schema(&self.file_schema)?;
+
+ let required_bytes = size_of_columns(&projection_into_file_schema,
metadata)?;
+ let can_use_index = columns_sorted(&projection_into_file_schema,
metadata)?;
Ok(Some(FilterCandidate {
- expr: rewritten_expr,
+ expr: self.expr,
required_bytes,
can_use_index,
- projection: required_indices.into_iter().collect(),
+ projection: projection_into_file_schema,
+ schema_mapper: Arc::clone(&schema_mapper),
+ filter_schema: Arc::clone(&projected_table_schema),
}))
}
}
@@ -294,33 +300,29 @@ struct PushdownChecker<'schema> {
/// Does the expression reference any columns that are in the table
/// schema but not in the file schema?
projected_columns: bool,
- // the indices of all the columns found within the given expression which
exist inside the given
- // [`file_schema`]
- required_column_indices: BTreeSet<usize>,
- file_schema: &'schema Schema,
+ // Indices into the table schema of the columns required to evaluate the
expression
+ required_columns: BTreeSet<usize>,
table_schema: &'schema Schema,
}
impl<'schema> PushdownChecker<'schema> {
- fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) ->
Self {
+ fn new(table_schema: &'schema Schema) -> Self {
Self {
non_primitive_columns: false,
projected_columns: false,
- required_column_indices: BTreeSet::default(),
- file_schema,
+ required_columns: BTreeSet::default(),
table_schema,
}
}
fn check_single_column(&mut self, column_name: &str) ->
Option<TreeNodeRecursion> {
- if let Ok(idx) = self.file_schema.index_of(column_name) {
- self.required_column_indices.insert(idx);
-
- if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+ if let Ok(idx) = self.table_schema.index_of(column_name) {
+ self.required_columns.insert(idx);
+ if DataType::is_nested(self.table_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
return Some(TreeNodeRecursion::Jump);
}
- } else if self.table_schema.index_of(column_name).is_err() {
+ } else {
// If the column does not exist in the (un-projected) table schema
then
// it must be a projected column.
self.projected_columns = true;
@@ -336,82 +338,40 @@ impl<'schema> PushdownChecker<'schema> {
}
}
-impl TreeNodeRewriter for PushdownChecker<'_> {
+impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
type Node = Arc<dyn PhysicalExpr>;
- fn f_down(
- &mut self,
- node: Arc<dyn PhysicalExpr>,
- ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+ fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
if let Some(recursion) = self.check_single_column(column.name()) {
- return Ok(Transformed::new(node, false, recursion));
+ return Ok(recursion);
}
}
- Ok(Transformed::no(node))
- }
-
- /// After visiting all children, rewrite column references to nulls if
- /// they are not in the file schema.
- /// We do this because they won't be relevant if they're not in the file
schema, since that's
- /// the only thing we're dealing with here as this is only used for the
parquet pushdown during
- /// scanning
- fn f_up(
- &mut self,
- expr: Arc<dyn PhysicalExpr>,
- ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
- if let Some(column) = expr.as_any().downcast_ref::<Column>() {
- // if the expression is a column, is it in the file schema?
- if self.file_schema.field_with_name(column.name()).is_err() {
- return self
- .table_schema
- .field_with_name(column.name())
- .and_then(|field| {
- // Replace the column reference with a NULL (using the
type from the table schema)
- // e.g. `column = 'foo'` is rewritten be transformed
to `NULL = 'foo'`
- //
- // See comments on `FilterCandidateBuilder` for more
information
- let null_value =
ScalarValue::try_from(field.data_type())?;
- Ok(Transformed::yes(Arc::new(Literal::new(null_value))
as _))
- })
- // If the column is not in the table schema, should throw
the error
- .map_err(|e| arrow_datafusion_err!(e));
- }
- }
-
- Ok(Transformed::no(expr))
+ Ok(TreeNodeRecursion::Continue)
}
}
-type ProjectionAndExpr = (BTreeSet<usize>, Arc<dyn PhysicalExpr>);
-
// Checks if a given expression can be pushed down into `DataSourceExec` as
opposed to being evaluated
// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns
all the
// columns in the given expression so that they can be used in the parquet
scanning, along with the
// expression rewritten as defined in [`PushdownChecker::f_up`]
fn pushdown_columns(
- expr: Arc<dyn PhysicalExpr>,
- file_schema: &Schema,
+ expr: &Arc<dyn PhysicalExpr>,
table_schema: &Schema,
-) -> Result<Option<ProjectionAndExpr>> {
- let mut checker = PushdownChecker::new(file_schema, table_schema);
-
- let expr = expr.rewrite(&mut checker).data()?;
-
-
Ok((!checker.prevents_pushdown()).then_some((checker.required_column_indices,
expr)))
+) -> Result<Option<Vec<usize>>> {
+ let mut checker = PushdownChecker::new(table_schema);
+ expr.visit(&mut checker)?;
+ Ok((!checker.prevents_pushdown())
+ .then_some(checker.required_columns.into_iter().collect()))
}
/// creates a PushdownChecker for a single use to check a given column with
the given schemes. Used
/// to check preemptively if a column name would prevent pushdowning.
/// effectively does the inverse of [`pushdown_columns`] does, but with a
single given column
/// (instead of traversing the entire tree to determine this)
-fn would_column_prevent_pushdown(
- column_name: &str,
- file_schema: &Schema,
- table_schema: &Schema,
-) -> bool {
- let mut checker = PushdownChecker::new(file_schema, table_schema);
+fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) ->
bool {
+ let mut checker = PushdownChecker::new(table_schema);
// the return of this is only used for [`PushdownChecker::f_down()`], so
we can safely ignore
// it here. I'm just verifying we know the return type of this so nobody
accidentally changes
@@ -427,14 +387,13 @@ fn would_column_prevent_pushdown(
/// Otherwise, true.
pub fn can_expr_be_pushed_down_with_schemas(
expr: &datafusion_expr::Expr,
- file_schema: &Schema,
+ _file_schema: &Schema,
table_schema: &Schema,
) -> bool {
let mut can_be_pushed = true;
expr.apply(|expr| match expr {
datafusion_expr::Expr::Column(column) => {
- can_be_pushed &=
- !would_column_prevent_pushdown(column.name(), file_schema,
table_schema);
+ can_be_pushed &= !would_column_prevent_pushdown(column.name(),
table_schema);
Ok(if can_be_pushed {
TreeNodeRecursion::Jump
} else {
@@ -447,41 +406,12 @@ pub fn can_expr_be_pushed_down_with_schemas(
can_be_pushed
}
-/// Computes the projection required to go from the file's schema order to the
projected
-/// order expected by this filter
-///
-/// Effectively this computes the rank of each element in `src`
-fn remap_projection(src: &[usize]) -> Vec<usize> {
- let len = src.len();
-
- // Compute the column mapping from projected order to file order
- // i.e. the indices required to sort projected schema into the file schema
- //
- // e.g. projection: [5, 9, 0] -> [2, 0, 1]
- let mut sorted_indexes: Vec<_> = (0..len).collect();
- sorted_indexes.sort_unstable_by_key(|x| src[*x]);
-
- // Compute the mapping from schema order to projected order
- // i.e. the indices required to sort file schema into the projected schema
- //
- // Above we computed the order of the projected schema according to the
file
- // schema, and so we can use this as the comparator
- //
- // e.g. sorted_indexes [2, 0, 1] -> [1, 2, 0]
- let mut projection: Vec<_> = (0..len).collect();
- projection.sort_unstable_by_key(|x| sorted_indexes[*x]);
- projection
-}
-
/// Calculate the total compressed size of all `Column`'s required for
/// predicate `Expr`.
///
/// This value represents the total amount of IO required to evaluate the
/// predicate.
-fn size_of_columns(
- columns: &BTreeSet<usize>,
- metadata: &ParquetMetaData,
-) -> Result<usize> {
+fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) ->
Result<usize> {
let mut total_size = 0;
let row_groups = metadata.row_groups();
for idx in columns {
@@ -498,10 +428,7 @@ fn size_of_columns(
///
/// Sorted columns may be queried more efficiently in the presence of
/// a PageIndex.
-fn columns_sorted(
- _columns: &BTreeSet<usize>,
- _metadata: &ParquetMetaData,
-) -> Result<bool> {
+fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) ->
Result<bool> {
// TODO How do we know this?
Ok(false)
}
@@ -522,12 +449,12 @@ fn columns_sorted(
/// `a = 1` and `c = 3`.
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
- file_schema: &Schema,
- table_schema: &Schema,
+ file_schema: &SchemaRef,
+ table_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
- schema_mapping: Arc<dyn SchemaMapper>,
+ schema_adapter_factory: &Arc<dyn SchemaAdapterFactory>,
) -> Result<Option<RowFilter>> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
@@ -541,8 +468,13 @@ pub fn build_row_filter(
let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
.map(|expr| {
- FilterCandidateBuilder::new(Arc::clone(expr), file_schema,
table_schema)
- .build(metadata)
+ FilterCandidateBuilder::new(
+ Arc::clone(expr),
+ Arc::clone(file_schema),
+ Arc::clone(table_schema),
+ Arc::clone(schema_adapter_factory),
+ )
+ .build(metadata)
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
@@ -568,12 +500,10 @@ pub fn build_row_filter(
.map(|candidate| {
DatafusionArrowPredicate::try_new(
candidate,
- file_schema,
metadata,
rows_pruned.clone(),
rows_matched.clone(),
time.clone(),
- Arc::clone(&schema_mapping),
)
.map(|pred| Box::new(pred) as _)
})
@@ -584,19 +514,17 @@ pub fn build_row_filter(
#[cfg(test)]
mod test {
use super::*;
- use datafusion_datasource::schema_adapter::{
- DefaultSchemaAdapterFactory, SchemaAdapterFactory,
- };
+ use datafusion_common::ScalarValue;
use arrow::datatypes::{Field, Fields, TimeUnit::Nanosecond};
- use datafusion_expr::{cast, col, lit, Expr};
+ use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
+ use datafusion_expr::{col, Expr};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::metrics::{Count, Time};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
- use rand::prelude::*;
// We should ignore predicate that read non-primitive columns
#[test]
@@ -616,51 +544,19 @@ mod test {
let expr = col("int64_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
- let candidate = FilterCandidateBuilder::new(expr, &table_schema,
&table_schema)
- .build(metadata)
- .expect("building candidate");
-
- assert!(candidate.is_none());
- }
-
- // If a column exists in the table schema but not the file schema it
should be rewritten to a null expression
- #[test]
- fn test_filter_candidate_builder_rewrite_missing_column() {
- let testdata = datafusion_common::test_util::parquet_test_data();
- let file =
std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
- .expect("opening file");
-
- let reader = SerializedFileReader::new(file).expect("creating reader");
-
- let metadata = reader.metadata();
-
- let table_schema =
- parquet_to_arrow_schema(metadata.file_metadata().schema_descr(),
None)
- .expect("parsing schema");
-
- let file_schema = Schema::new(vec![
- Field::new("bigint_col", DataType::Int64, true),
- Field::new("float_col", DataType::Float32, true),
- ]);
-
- // The parquet file with `file_schema` just has `bigint_col` and
`float_col` column, and don't have the `int_col`
- let expr = col("bigint_col").eq(cast(col("int_col"), DataType::Int64));
- let expr = logical2physical(&expr, &table_schema);
- let expected_candidate_expr =
- col("bigint_col").eq(cast(lit(ScalarValue::Int32(None)),
DataType::Int64));
- let expected_candidate_expr =
- logical2physical(&expected_candidate_expr, &table_schema);
-
- let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
- .build(metadata)
- .expect("building candidate");
+ let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+ let table_schema = Arc::new(table_schema.clone());
- assert!(candidate.is_some());
+ let candidate = FilterCandidateBuilder::new(
+ expr,
+ table_schema.clone(),
+ table_schema,
+ schema_adapter_factory,
+ )
+ .build(metadata)
+ .expect("building candidate");
- assert_eq!(
- candidate.unwrap().expr.to_string(),
- expected_candidate_expr.to_string()
- );
+ assert!(candidate.is_none());
}
#[test]
@@ -682,42 +578,43 @@ mod test {
false,
)]);
- let table_ref = Arc::new(table_schema.clone());
- let schema_adapter =
- DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref),
table_ref);
- let (schema_mapping, _) = schema_adapter
- .map_schema(&file_schema)
- .expect("creating schema mapping");
-
- let mut parquet_reader =
parquet_reader_builder.build().expect("building reader");
-
- // Parquet file is small, we only need 1 record batch
- let first_rb = parquet_reader
- .next()
- .expect("expected record batch")
- .expect("expected error free record batch");
-
// Test all should fail
let expr = col("timestamp_col").lt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
));
let expr = logical2physical(&expr, &table_schema);
- let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
- .build(&metadata)
- .expect("building candidate")
- .expect("candidate expected");
+ let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+ let table_schema = Arc::new(table_schema.clone());
+ let candidate = FilterCandidateBuilder::new(
+ expr,
+ file_schema.clone(),
+ table_schema.clone(),
+ schema_adapter_factory,
+ )
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
- &file_schema,
&metadata,
Count::new(),
Count::new(),
Time::new(),
- Arc::clone(&schema_mapping),
)
.expect("creating filter predicate");
+ let mut parquet_reader = parquet_reader_builder
+ .with_projection(row_filter.projection().clone())
+ .build()
+ .expect("building reader");
+
+ // Parquet file is small, we only need 1 record batch
+ let first_rb = parquet_reader
+ .next()
+ .expect("expected record batch")
+ .expect("expected error free record batch");
+
let filtered = row_filter.evaluate(first_rb.clone());
assert!(matches!(filtered, Ok(a) if a ==
BooleanArray::from(vec![false; 8])));
@@ -726,19 +623,23 @@ mod test {
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
));
let expr = logical2physical(&expr, &table_schema);
- let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
- .build(&metadata)
- .expect("building candidate")
- .expect("candidate expected");
+ let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+ let candidate = FilterCandidateBuilder::new(
+ expr,
+ file_schema,
+ table_schema,
+ schema_adapter_factory,
+ )
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
- &file_schema,
&metadata,
Count::new(),
Count::new(),
Time::new(),
- schema_mapping,
)
.expect("creating filter predicate");
@@ -746,24 +647,6 @@ mod test {
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true;
8])));
}
- #[test]
- fn test_remap_projection() {
- let mut rng = thread_rng();
- for _ in 0..100 {
- // A random selection of column indexes in arbitrary order
- let projection: Vec<_> = (0..100).map(|_| rng.gen()).collect();
-
- // File order is the projection sorted
- let mut file_order = projection.clone();
- file_order.sort_unstable();
-
- let remap = remap_projection(&projection);
- // Applying the remapped projection to the file order should yield
the original
- let remapped: Vec<_> = remap.iter().map(|r|
file_order[*r]).collect();
- assert_eq!(projection, remapped)
- }
- }
-
#[test]
fn nested_data_structures_prevent_pushdown() {
let table_schema = get_basic_table_schema();
@@ -803,9 +686,10 @@ mod test {
fn basic_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();
- let file_schema = Schema::new(vec![Field::new("str_col",
DataType::Utf8, true)]);
+ let file_schema =
+ Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]);
- let expr = col("str_col").is_null();
+ let expr = col("string_col").is_null();
assert!(can_expr_be_pushed_down_with_schemas(
&expr,
@@ -819,13 +703,13 @@ mod test {
let table_schema = get_basic_table_schema();
let file_schema = Schema::new(vec![
- Field::new("str_col", DataType::Utf8, true),
- Field::new("int_col", DataType::UInt64, true),
+ Field::new("string_col", DataType::Utf8, true),
+ Field::new("bigint_col", DataType::Int64, true),
]);
- let expr = col("str_col")
+ let expr = col("string_col")
.is_not_null()
-
.or(col("int_col").gt(Expr::Literal(ScalarValue::UInt64(Some(5)))));
+
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)))));
assert!(can_expr_be_pushed_down_with_schemas(
&expr,
diff --git a/datafusion/datasource/src/schema_adapter.rs
b/datafusion/datasource/src/schema_adapter.rs
index e3a4ea4918..4164cda8cb 100644
--- a/datafusion/datasource/src/schema_adapter.rs
+++ b/datafusion/datasource/src/schema_adapter.rs
@@ -96,19 +96,6 @@ pub trait SchemaAdapter: Send + Sync {
pub trait SchemaMapper: Debug + Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema`
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch>;
-
- /// Adapts a [`RecordBatch`] that does not have all the columns from the
- /// file schema.
- ///
- /// This method is used, for example, when applying a filter to a subset
of
- /// the columns as part of `DataFusionArrowPredicate` when
`filter_pushdown`
- /// is enabled.
- ///
- /// This method is slower than `map_batch` as it looks up columns by name.
- fn map_partial_batch(
- &self,
- batch: RecordBatch,
- ) -> datafusion_common::Result<RecordBatch>;
}
/// Default [`SchemaAdapterFactory`] for mapping schemas.
@@ -215,11 +202,10 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory
{
fn create(
&self,
projected_table_schema: SchemaRef,
- table_schema: SchemaRef,
+ _table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(DefaultSchemaAdapter {
projected_table_schema,
- table_schema,
})
}
}
@@ -231,12 +217,6 @@ pub(crate) struct DefaultSchemaAdapter {
/// The schema for the table, projected to include only the fields being
output (projected) by the
/// associated ParquetSource
projected_table_schema: SchemaRef,
- /// The entire table schema for the table we're using this to adapt.
- ///
- /// This is used to evaluate any filters pushed down into the scan
- /// which may refer to columns that are not referred to anywhere
- /// else in the plan.
- table_schema: SchemaRef,
}
impl SchemaAdapter for DefaultSchemaAdapter {
@@ -290,7 +270,6 @@ impl SchemaAdapter for DefaultSchemaAdapter {
Arc::new(SchemaMapping {
projected_table_schema:
Arc::clone(&self.projected_table_schema),
field_mappings,
- table_schema: Arc::clone(&self.table_schema),
}),
projection,
))
@@ -300,27 +279,12 @@ impl SchemaAdapter for DefaultSchemaAdapter {
/// The SchemaMapping struct holds a mapping from the file schema to the table
/// schema and any necessary type conversions.
///
-/// Note, because `map_batch` and `map_partial_batch` functions have different
-/// needs, this struct holds two schemas:
-///
-/// 1. The projected **table** schema
-/// 2. The full table schema
-///
/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
/// has the projected schema, since that's the schema which is supposed to come
/// out of the execution of this query. Thus `map_batch` uses
/// `projected_table_schema` as it can only operate on the projected fields.
///
-/// [`map_partial_batch`] is used to create a RecordBatch with a schema that
-/// can be used for Parquet predicate pushdown, meaning that it may contain
-/// fields which are not in the projected schema (as the fields that parquet
-/// pushdown filters operate can be completely distinct from the fields that
are
-/// projected (output) out of the ParquetSource). `map_partial_batch` thus uses
-/// `table_schema` to create the resulting RecordBatch (as it could be
operating
-/// on any fields in the schema).
-///
/// [`map_batch`]: Self::map_batch
-/// [`map_partial_batch`]: Self::map_partial_batch
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion
@@ -332,18 +296,12 @@ pub struct SchemaMapping {
/// They are Options instead of just plain `usize`s because the table could
/// have fields that don't exist in the file.
field_mappings: Vec<Option<usize>>,
- /// The entire table schema, as opposed to the projected_table_schema
(which
- /// only contains the columns that we are projecting out of this query).
- /// This contains all fields in the table, regardless of if they will be
- /// projected out or not.
- table_schema: SchemaRef,
}
impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `projected_table_schema` using the
stored mapping and
- /// conversions. The produced RecordBatch has a schema that contains only
the projected
- /// columns, so if one needs a RecordBatch with a schema that references
columns which are not
- /// in the projected, it would be better to use `map_partial_batch`
+ /// conversions.
+ /// The produced RecordBatch has a schema that contains only the projected
columns.
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();
@@ -376,54 +334,4 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
Ok(record_batch)
}
-
- /// Adapts a [`RecordBatch`]'s schema into one that has all the correct
output types and only
- /// contains the fields that exist in both the file schema and table
schema.
- ///
- /// Unlike `map_batch` this method also preserves the columns that
- /// may not appear in the final output (`projected_table_schema`) but may
- /// appear in push down predicates
- fn map_partial_batch(
- &self,
- batch: RecordBatch,
- ) -> datafusion_common::Result<RecordBatch> {
- let batch_cols = batch.columns().to_vec();
- let schema = batch.schema();
-
- // for each field in the batch's schema (which is based on a file, not
a table)...
- let (cols, fields) = schema
- .fields()
- .iter()
- .zip(batch_cols.iter())
- .flat_map(|(field, batch_col)| {
- self.table_schema
- // try to get the same field from the table schema that we
have stored in self
- .field_with_name(field.name())
- // and if we don't have it, that's fine, ignore it. This
may occur when we've
- // created an external table whose fields are a subset of
the fields in this
- // file, then tried to read data from the file into this
table. If that is the
- // case here, it's fine to ignore because we don't care
about this field
- // anyways
- .ok()
- // but if we do have it,
- .map(|table_field| {
- // try to cast it into the correct output type. we
don't want to ignore this
- // error, though, so it's propagated.
- cast(batch_col, table_field.data_type())
- // and if that works, return the field and column.
- .map(|new_col| (new_col, table_field.clone()))
- })
- })
- .collect::<Result<Vec<_>, _>>()?
- .into_iter()
- .unzip::<_, _, Vec<_>, Vec<_>>();
-
- // Necessary to handle empty batches
- let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
-
- let schema =
- Arc::new(Schema::new_with_metadata(fields,
schema.metadata().clone()));
- let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
- Ok(record_batch)
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]