This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new c0a6f238fc Add BatchAdapter to simplify using PhysicalExprAdapter /
Projector (#19877)
c0a6f238fc is described below
commit c0a6f238fcdbfaa56c075f7309eb81fc11d497eb
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jan 19 10:00:52 2026 -0500
Add BatchAdapter to simplify using PhysicalExprAdapter / Projector (#19877)
- part of https://github.com/apache/datafusion/issues/19784
- Brings https://github.com/apache/datafusion/pull/19716 from @adriangb
into `branch-52`
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
---
datafusion/datasource/src/schema_adapter.rs | 24 +-
datafusion/physical-expr-adapter/src/lib.rs | 5 +-
.../physical-expr-adapter/src/schema_rewriter.rs | 348 +++++++++++++++++++++
3 files changed, 373 insertions(+), 4 deletions(-)
diff --git a/datafusion/datasource/src/schema_adapter.rs
b/datafusion/datasource/src/schema_adapter.rs
index 3d0b06954e..c995fa58d6 100644
--- a/datafusion/datasource/src/schema_adapter.rs
+++ b/datafusion/datasource/src/schema_adapter.rs
@@ -115,10 +115,20 @@ pub trait SchemaMapper: Debug + Send + Sync {
/// Deprecated: Default [`SchemaAdapterFactory`] for mapping schemas.
///
-/// This struct has been removed. Use [`PhysicalExprAdapterFactory`] instead.
+/// This struct has been removed.
+///
+/// Use [`PhysicalExprAdapterFactory`] instead to customize scans via
+/// [`FileScanConfigBuilder`], i.e. if you had implemented a custom
[`SchemaAdapter`]
+/// and passed that into [`FileScanConfigBuilder`] / [`ParquetSource`].
+/// Use [`BatchAdapter`] if you want to map a stream of [`RecordBatch`]es
+/// between one schema and another, i.e. if you were calling
[`SchemaMapper::map_batch`] manually.
+///
/// See `upgrading.md` for more details.
///
/// [`PhysicalExprAdapterFactory`]:
datafusion_physical_expr_adapter::PhysicalExprAdapterFactory
+/// [`FileScanConfigBuilder`]: crate::file_scan_config::FileScanConfigBuilder
+/// [`ParquetSource`]:
https://docs.rs/datafusion-datasource-parquet/latest/datafusion_datasource_parquet/source/struct.ParquetSource.html
+/// [`BatchAdapter`]: datafusion_physical_expr_adapter::BatchAdapter
#[deprecated(
since = "52.0.0",
note = "DefaultSchemaAdapterFactory has been removed. Use
PhysicalExprAdapterFactory instead. See upgrading.md for more details."
@@ -178,10 +188,20 @@ impl SchemaAdapter for DeprecatedSchemaAdapter {
/// Deprecated: The SchemaMapping struct held a mapping from the file schema
to the table schema.
///
-/// This struct has been removed. Use [`PhysicalExprAdapterFactory`] instead.
+/// This struct has been removed.
+///
+/// Use [`PhysicalExprAdapterFactory`] instead to customize scans via
+/// [`FileScanConfigBuilder`], i.e. if you had implemented a custom
[`SchemaAdapter`]
+/// and passed that into [`FileScanConfigBuilder`] / [`ParquetSource`].
+/// Use [`BatchAdapter`] if you want to map a stream of [`RecordBatch`]es
+/// between one schema and another, i.e. if you were calling
[`SchemaMapper::map_batch`] manually.
+///
/// See `upgrading.md` for more details.
///
/// [`PhysicalExprAdapterFactory`]:
datafusion_physical_expr_adapter::PhysicalExprAdapterFactory
+/// [`FileScanConfigBuilder`]: crate::file_scan_config::FileScanConfigBuilder
+/// [`ParquetSource`]:
https://docs.rs/datafusion-datasource-parquet/latest/datafusion_datasource_parquet/source/struct.ParquetSource.html
+/// [`BatchAdapter`]: datafusion_physical_expr_adapter::BatchAdapter
#[deprecated(
since = "52.0.0",
note = "SchemaMapping has been removed. Use PhysicalExprAdapterFactory
instead. See upgrading.md for more details."
diff --git a/datafusion/physical-expr-adapter/src/lib.rs
b/datafusion/physical-expr-adapter/src/lib.rs
index d7c750e4a1..5ae86f219b 100644
--- a/datafusion/physical-expr-adapter/src/lib.rs
+++ b/datafusion/physical-expr-adapter/src/lib.rs
@@ -29,6 +29,7 @@
pub mod schema_rewriter;
pub use schema_rewriter::{
- DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory,
PhysicalExprAdapter,
- PhysicalExprAdapterFactory, replace_columns_with_literals,
+ BatchAdapter, BatchAdapterFactory, DefaultPhysicalExprAdapter,
+ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter,
PhysicalExprAdapterFactory,
+ replace_columns_with_literals,
};
diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
index 83727ac092..b2bed36f0e 100644
--- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
+++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
@@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
+use arrow::array::RecordBatch;
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::{
@@ -32,12 +33,15 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_functions::core::getfield::GetFieldFunc;
+use datafusion_physical_expr::PhysicalExprSimplifier;
use datafusion_physical_expr::expressions::CastColumnExpr;
+use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
use datafusion_physical_expr::{
ScalarFunctionExpr,
expressions::{self, Column},
};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use itertools::Itertools;
/// Replace column references in the given physical expression with literal
values.
///
@@ -473,6 +477,141 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
}
}
+/// Factory for creating [`BatchAdapter`] instances to adapt record batches
+/// to a target schema.
+///
+/// This binds a target schema and allows creating adapters for different
source schemas.
+/// It handles:
+/// - **Column reordering**: Columns are reordered to match the target schema
+/// - **Type casting**: Automatic type conversion (e.g., Int32 to Int64)
+/// - **Missing columns**: Nullable columns missing from source are filled
with nulls
+/// - **Struct field adaptation**: Nested struct fields are recursively adapted
+///
+/// ## Examples
+///
+/// ```rust
+/// use arrow::array::{Int32Array, Int64Array, StringArray, RecordBatch};
+/// use arrow::datatypes::{DataType, Field, Schema};
+/// use datafusion_physical_expr_adapter::BatchAdapterFactory;
+/// use std::sync::Arc;
+///
+/// // Target schema has different column order and types
+/// let target_schema = Arc::new(Schema::new(vec![
+/// Field::new("name", DataType::Utf8, true),
+/// Field::new("id", DataType::Int64, false), // Int64 in target
+/// Field::new("score", DataType::Float64, true), // Missing from source
+/// ]));
+///
+/// // Source schema has different column order and Int32 for id
+/// let source_schema = Arc::new(Schema::new(vec![
+/// Field::new("id", DataType::Int32, false), // Int32 in source
+/// Field::new("name", DataType::Utf8, true),
+/// // Note: 'score' column is missing from source
+/// ]));
+///
+/// // Create factory with target schema
+/// let factory = BatchAdapterFactory::new(Arc::clone(&target_schema));
+///
+/// // Create adapter for this specific source schema
+/// let adapter = factory.make_adapter(Arc::clone(&source_schema)).unwrap();
+///
+/// // Create a source batch
+/// let source_batch = RecordBatch::try_new(
+/// source_schema,
+/// vec![
+/// Arc::new(Int32Array::from(vec![1, 2, 3])),
+/// Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"])),
+/// ],
+/// ).unwrap();
+///
+/// // Adapt the batch to match target schema
+/// let adapted = adapter.adapt_batch(&source_batch).unwrap();
+///
+/// assert_eq!(adapted.num_columns(), 3);
+/// assert_eq!(adapted.column(0).data_type(), &DataType::Utf8); // name
+/// assert_eq!(adapted.column(1).data_type(), &DataType::Int64); // id (cast
from Int32)
+/// assert_eq!(adapted.column(2).data_type(), &DataType::Float64); // score
(filled with nulls)
+/// ```
+#[derive(Debug)]
+pub struct BatchAdapterFactory {
+ target_schema: SchemaRef,
+ expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
+}
+
+impl BatchAdapterFactory {
+ /// Create a new [`BatchAdapterFactory`] with the given target schema.
+ pub fn new(target_schema: SchemaRef) -> Self {
+ let expr_adapter_factory = Arc::new(DefaultPhysicalExprAdapterFactory);
+ Self {
+ target_schema,
+ expr_adapter_factory,
+ }
+ }
+
+ /// Set a custom [`PhysicalExprAdapterFactory`] to use when adapting
expressions.
+ ///
+ /// Use this to customize behavior when adapting batches, e.g. to fill in
missing values
+ /// with defaults instead of nulls.
+ ///
+ /// See [`PhysicalExprAdapter`] for more details.
+ pub fn with_adapter_factory(
+ self,
+ factory: Arc<dyn PhysicalExprAdapterFactory>,
+ ) -> Self {
+ Self {
+ expr_adapter_factory: factory,
+ ..self
+ }
+ }
+
+ /// Create a new [`BatchAdapter`] for the given source schema.
+ ///
+ /// Batches fed into this [`BatchAdapter`] *must* conform to the source
schema,
+ /// no validation is performed at runtime to minimize overheads.
+ pub fn make_adapter(&self, source_schema: SchemaRef) ->
Result<BatchAdapter> {
+ let expr_adapter = self
+ .expr_adapter_factory
+ .create(Arc::clone(&self.target_schema),
Arc::clone(&source_schema));
+
+ let simplifier = PhysicalExprSimplifier::new(&self.target_schema);
+
+ let projection = ProjectionExprs::from_indices(
+ &(0..self.target_schema.fields().len()).collect_vec(),
+ &self.target_schema,
+ );
+
+ let adapted = projection
+ .try_map_exprs(|e| simplifier.simplify(expr_adapter.rewrite(e)?))?;
+ let projector = adapted.make_projector(&source_schema)?;
+
+ Ok(BatchAdapter { projector })
+ }
+}
+
+/// Adapter for transforming record batches to match a target schema.
+///
+/// Create instances via [`BatchAdapterFactory`].
+///
+/// ## Performance
+///
+/// The adapter pre-computes the projection expressions during creation,
+/// so the [`adapt_batch`](BatchAdapter::adapt_batch) call is efficient and
suitable
+/// for use in hot paths like streaming file scans.
+#[derive(Debug)]
+pub struct BatchAdapter {
+ projector: Projector,
+}
+
+impl BatchAdapter {
+ /// Adapt the given record batch to match the target schema.
+ ///
+ /// The input batch *must* conform to the source schema used when
+ /// creating this adapter.
+ pub fn adapt_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
+ self.projector.project_batch(batch)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -1046,4 +1185,213 @@ mod tests {
// with ScalarUDF, which is complex to set up in a unit test. The
integration tests in
// datafusion/core/tests/parquet/schema_adapter.rs provide better
coverage for this functionality.
}
+
+ //
============================================================================
+ // BatchAdapterFactory and BatchAdapter tests
+ //
============================================================================
+
+ #[test]
+ fn test_batch_adapter_factory_basic() {
+ // Target schema
+ let target_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Utf8, true),
+ ]));
+
+ // Source schema with different column order and type
+ let source_schema = Arc::new(Schema::new(vec![
+ Field::new("b", DataType::Utf8, true),
+ Field::new("a", DataType::Int32, false), // Int32 -> Int64
+ ]));
+
+ let factory = BatchAdapterFactory::new(Arc::clone(&target_schema));
+ let adapter =
factory.make_adapter(Arc::clone(&source_schema)).unwrap();
+
+ // Create source batch
+ let source_batch = RecordBatch::try_new(
+ Arc::clone(&source_schema),
+ vec![
+ Arc::new(StringArray::from(vec![Some("hello"), None,
Some("world")])),
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ ],
+ )
+ .unwrap();
+
+ let adapted = adapter.adapt_batch(&source_batch).unwrap();
+
+ // Verify schema matches target
+ assert_eq!(adapted.num_columns(), 2);
+ assert_eq!(adapted.schema().field(0).name(), "a");
+ assert_eq!(adapted.schema().field(0).data_type(), &DataType::Int64);
+ assert_eq!(adapted.schema().field(1).name(), "b");
+ assert_eq!(adapted.schema().field(1).data_type(), &DataType::Utf8);
+
+ // Verify data
+ let col_a = adapted
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(col_a.iter().collect_vec(), vec![Some(1), Some(2),
Some(3)]);
+
+ let col_b = adapted
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(
+ col_b.iter().collect_vec(),
+ vec![Some("hello"), None, Some("world")]
+ );
+ }
+
+ #[test]
+ fn test_batch_adapter_factory_missing_column() {
+ // Target schema with a column missing from source
+ let target_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Utf8, true), // exists in source
+ Field::new("c", DataType::Float64, true), // missing from source
+ ]));
+
+ let source_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Utf8, true),
+ ]));
+
+ let factory = BatchAdapterFactory::new(Arc::clone(&target_schema));
+ let adapter =
factory.make_adapter(Arc::clone(&source_schema)).unwrap();
+
+ let source_batch = RecordBatch::try_new(
+ Arc::clone(&source_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2])),
+ Arc::new(StringArray::from(vec!["x", "y"])),
+ ],
+ )
+ .unwrap();
+
+ let adapted = adapter.adapt_batch(&source_batch).unwrap();
+
+ assert_eq!(adapted.num_columns(), 3);
+
+ // Missing column should be filled with nulls
+ let col_c = adapted.column(2);
+ assert_eq!(col_c.data_type(), &DataType::Float64);
+ assert_eq!(col_c.null_count(), 2); // All nulls
+ }
+
+ #[test]
+ fn test_batch_adapter_factory_with_struct() {
+ // Target has struct with Int64 id
+ let target_struct_fields: Fields = vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8, true),
+ ]
+ .into();
+ let target_schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::Struct(target_struct_fields),
+ false,
+ )]));
+
+ // Source has struct with Int32 id
+ let source_struct_fields: Fields = vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]
+ .into();
+ let source_schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::Struct(source_struct_fields.clone()),
+ false,
+ )]));
+
+ let struct_array = StructArray::new(
+ source_struct_fields,
+ vec![
+ Arc::new(Int32Array::from(vec![10, 20])) as _,
+ Arc::new(StringArray::from(vec!["a", "b"])) as _,
+ ],
+ None,
+ );
+
+ let source_batch = RecordBatch::try_new(
+ Arc::clone(&source_schema),
+ vec![Arc::new(struct_array)],
+ )
+ .unwrap();
+
+ let factory = BatchAdapterFactory::new(Arc::clone(&target_schema));
+ let adapter = factory.make_adapter(source_schema).unwrap();
+ let adapted = adapter.adapt_batch(&source_batch).unwrap();
+
+ let result_struct = adapted
+ .column(0)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+
+ // Verify id was cast to Int64
+ let id_col = result_struct.column_by_name("id").unwrap();
+ assert_eq!(id_col.data_type(), &DataType::Int64);
+ let id_values = id_col.as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(id_values.iter().collect_vec(), vec![Some(10), Some(20)]);
+ }
+
+ #[test]
+ fn test_batch_adapter_factory_identity() {
+ // When source and target schemas are identical, should pass through
efficiently
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Utf8, true),
+ ]));
+
+ let factory = BatchAdapterFactory::new(Arc::clone(&schema));
+ let adapter = factory.make_adapter(Arc::clone(&schema)).unwrap();
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StringArray::from(vec!["a", "b", "c"])),
+ ],
+ )
+ .unwrap();
+
+ let adapted = adapter.adapt_batch(&batch).unwrap();
+
+ assert_eq!(adapted.num_columns(), 2);
+ assert_eq!(adapted.schema().field(0).data_type(), &DataType::Int32);
+ assert_eq!(adapted.schema().field(1).data_type(), &DataType::Utf8);
+ }
+
+ #[test]
+ fn test_batch_adapter_factory_reuse() {
+ // Factory can create multiple adapters for different source schemas
+ let target_schema = Arc::new(Schema::new(vec![
+ Field::new("x", DataType::Int64, false),
+ Field::new("y", DataType::Utf8, true),
+ ]));
+
+ let factory = BatchAdapterFactory::new(Arc::clone(&target_schema));
+
+ // First source schema
+ let source1 = Arc::new(Schema::new(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, true),
+ ]));
+ let adapter1 = factory.make_adapter(source1).unwrap();
+
+ // Second source schema (different order)
+ let source2 = Arc::new(Schema::new(vec![
+ Field::new("y", DataType::Utf8, true),
+ Field::new("x", DataType::Int64, false),
+ ]));
+ let adapter2 = factory.make_adapter(source2).unwrap();
+
+ // Both should work correctly
+ assert!(format!("{:?}", adapter1).contains("BatchAdapter"));
+ assert!(format!("{:?}", adapter2).contains("BatchAdapter"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]