This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e01fa0ce42 Cut `Parquet` over to PhysicalExprAdapter, remove 
`SchemaAdapter` (#18998)
e01fa0ce42 is described below

commit e01fa0ce42add38d0e1894fb970dc9f8c3742a43
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Fri Dec 5 08:36:00 2025 +0100

    Cut `Parquet` over to PhysicalExprAdapter, remove `SchemaAdapter` (#18998)
    
    Chips away at https://github.com/apache/datafusion/issues/14993 and
    https://github.com/apache/datafusion/issues/16800
    
    Changes made in this PR:
    - Update DefaultPhysicalExprAdapter to handle struct column evolution
    - Remove use of SchemaAdapter from row filter / predicate pushdown /
    late materialization. I believe it was already not doing much here other
    than where tests checked specific behavior and had not been updated to
    use PhysicalExprAdapter
    - Changed projection handling to use `PhysicalExprAdapter` instead of
    `SchemaAdapter`
    - Kept intermediary `Vec<usize>` so we can use `ProjectionMask::roots`
    and punt the complexity of implementing `ProjectionExprs` ->
    `ProjectionMask` until a later PR (there is a draft in #18966 of what
    that might look like).
---
 datafusion/core/src/datasource/mod.rs              | 122 +++--
 .../core/src/datasource/physical_plan/parquet.rs   |   4 +-
 datafusion/core/tests/parquet/schema_adapter.rs    | 390 ++++-----------
 .../schema_adapter_integration_tests.rs            | 540 +++++++++++++++++----
 datafusion/datasource-parquet/src/file_format.rs   |   5 +-
 datafusion/datasource-parquet/src/opener.rs        | 243 ++--------
 datafusion/datasource-parquet/src/row_filter.rs    | 127 ++---
 datafusion/datasource-parquet/src/source.rs        |  91 +---
 .../physical-expr-adapter/src/schema_rewriter.rs   | 211 ++++++--
 datafusion/physical-expr/src/expressions/cast.rs   |   3 +
 datafusion/physical-expr/src/projection.rs         |  43 ++
 datafusion/pruning/src/pruning_predicate.rs        |  15 +
 datafusion/sqllogictest/test_files/timestamps.slt  |  70 +++
 docs/source/library-user-guide/upgrading.md        |  11 +-
 14 files changed, 1020 insertions(+), 855 deletions(-)

diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index 620e389a0f..28faea9a68 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -55,30 +55,35 @@ mod tests {
     use crate::prelude::SessionContext;
     use ::object_store::{path::Path, ObjectMeta};
     use arrow::{
-        array::{Int32Array, StringArray},
-        datatypes::{DataType, Field, Schema, SchemaRef},
+        array::Int32Array,
+        datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
         record_batch::RecordBatch,
     };
-    use datafusion_common::{record_batch, test_util::batches_to_sort_string};
+    use datafusion_common::{
+        record_batch,
+        test_util::batches_to_sort_string,
+        tree_node::{Transformed, TransformedResult, TreeNode},
+        Result, ScalarValue,
+    };
     use datafusion_datasource::{
-        file::FileSource,
         file_scan_config::FileScanConfigBuilder,
-        schema_adapter::{
-            DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
-            SchemaMapper,
-        },
-        source::DataSourceExec,
+        schema_adapter::DefaultSchemaAdapterFactory, source::DataSourceExec,
         PartitionedFile,
     };
     use datafusion_datasource_parquet::source::ParquetSource;
+    use datafusion_physical_expr::expressions::{Column, Literal};
+    use datafusion_physical_expr_adapter::{
+        PhysicalExprAdapter, PhysicalExprAdapterFactory,
+    };
+    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
     use datafusion_physical_plan::collect;
     use std::{fs, sync::Arc};
     use tempfile::TempDir;
 
     #[tokio::test]
-    async fn can_override_schema_adapter() {
-        // Test shows that SchemaAdapter can add a column that doesn't 
existing in the
-        // record batches returned from parquet.  This can be useful for 
schema evolution
+    async fn can_override_physical_expr_adapter() {
+        // Test shows that PhysicalExprAdapter can add a column that doesn't 
exist in the
+        // record batches returned from parquet. This can be useful for schema 
evolution
         // where older files may not have all columns.
 
         use datafusion_execution::object_store::ObjectStoreUrl;
@@ -124,12 +129,11 @@ mod tests {
         let f2 = Field::new("extra_column", DataType::Utf8, true);
 
         let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
-        let source = ParquetSource::new(Arc::clone(&schema))
-            .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
-            .unwrap();
+        let source = Arc::new(ParquetSource::new(Arc::clone(&schema)));
         let base_conf =
             FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), 
source)
                 .with_file(partitioned_file)
+                
.with_expr_adapter(Some(Arc::new(TestPhysicalExprAdapterFactory)))
                 .build();
 
         let parquet_exec = DataSourceExec::from_data_source(base_conf);
@@ -200,72 +204,54 @@ mod tests {
     }
 
     #[derive(Debug)]
-    struct TestSchemaAdapterFactory;
+    struct TestPhysicalExprAdapterFactory;
 
-    impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+    impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory {
         fn create(
             &self,
-            projected_table_schema: SchemaRef,
-            _table_schema: SchemaRef,
-        ) -> Box<dyn SchemaAdapter> {
-            Box::new(TestSchemaAdapter {
-                table_schema: projected_table_schema,
+            logical_file_schema: SchemaRef,
+            physical_file_schema: SchemaRef,
+        ) -> Arc<dyn PhysicalExprAdapter> {
+            Arc::new(TestPhysicalExprAdapter {
+                logical_file_schema,
+                physical_file_schema,
             })
         }
     }
 
-    struct TestSchemaAdapter {
-        /// Schema for the table
-        table_schema: SchemaRef,
+    #[derive(Debug)]
+    struct TestPhysicalExprAdapter {
+        logical_file_schema: SchemaRef,
+        physical_file_schema: SchemaRef,
     }
 
-    impl SchemaAdapter for TestSchemaAdapter {
-        fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-            let field = self.table_schema.field(index);
-            Some(file_schema.fields.find(field.name())?.0)
-        }
-
-        fn map_schema(
-            &self,
-            file_schema: &Schema,
-        ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-            let mut projection = 
Vec::with_capacity(file_schema.fields().len());
-
-            for (file_idx, file_field) in 
file_schema.fields.iter().enumerate() {
-                if 
self.table_schema.fields().find(file_field.name()).is_some() {
-                    projection.push(file_idx);
+    impl PhysicalExprAdapter for TestPhysicalExprAdapter {
+        fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
+            expr.transform(|e| {
+                if let Some(column) = e.as_any().downcast_ref::<Column>() {
+                    // If column is "extra_column" and missing from physical 
schema, inject "foo"
+                    if column.name() == "extra_column"
+                        && 
self.physical_file_schema.index_of("extra_column").is_err()
+                    {
+                        return Ok(Transformed::yes(Arc::new(Literal::new(
+                            ScalarValue::Utf8(Some("foo".to_string())),
+                        ))
+                            as Arc<dyn PhysicalExpr>));
+                    }
                 }
-            }
-
-            Ok((Arc::new(TestSchemaMapping {}), projection))
-        }
-    }
-
-    #[derive(Debug)]
-    struct TestSchemaMapping {}
-
-    impl SchemaMapper for TestSchemaMapping {
-        fn map_batch(
-            &self,
-            batch: RecordBatch,
-        ) -> datafusion_common::Result<RecordBatch> {
-            let f1 = Field::new("id", DataType::Int32, true);
-            let f2 = Field::new("extra_column", DataType::Utf8, true);
-
-            let schema = Arc::new(Schema::new(vec![f1, f2]));
-
-            let extra_column = Arc::new(StringArray::from(vec!["foo"]));
-            let mut new_columns = batch.columns().to_vec();
-            new_columns.push(extra_column);
-
-            Ok(RecordBatch::try_new(schema, new_columns).unwrap())
+                Ok(Transformed::no(e))
+            })
+            .data()
         }
 
-        fn map_column_statistics(
+        fn with_partition_values(
             &self,
-            _file_col_statistics: &[datafusion_common::ColumnStatistics],
-        ) -> 
datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
-            unimplemented!()
+            _partition_values: Vec<(FieldRef, ScalarValue)>,
+        ) -> Arc<dyn PhysicalExprAdapter> {
+            Arc::new(TestPhysicalExprAdapter {
+                logical_file_schema: self.logical_file_schema.clone(),
+                physical_file_schema: self.physical_file_schema.clone(),
+            })
         }
     }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 4d7c4a8788..6ed01cde14 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -1257,7 +1257,7 @@ mod tests {
             ("c3", c3.clone()),
         ]);
 
-        // batch2: c3(int8), c2(int64), c1(string), c4(string)
+        // batch2: c3(date64), c2(int64), c1(string)
         let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);
 
         let table_schema = Schema::new(vec![
@@ -1272,7 +1272,7 @@ mod tests {
             .round_trip_to_batches(vec![batch1, batch2])
             .await;
         assert_contains!(read.unwrap_err().to_string(),
-            "Cannot cast file schema field c3 of type Date64 to table schema 
field of type Int8");
+            "Cannot cast column 'c3' from 'Date64' (physical data type) to 
'Int8' (logical data type)");
     }
 
     #[tokio::test]
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs 
b/datafusion/core/tests/parquet/schema_adapter.rs
index 0e76d626aa..541785319c 100644
--- a/datafusion/core/tests/parquet/schema_adapter.rs
+++ b/datafusion/core/tests/parquet/schema_adapter.rs
@@ -17,8 +17,7 @@
 
 use std::sync::Arc;
 
-use arrow::array::{record_batch, RecordBatch, RecordBatchOptions};
-use arrow::compute::{cast_with_options, CastOptions};
+use arrow::array::{record_batch, RecordBatch};
 use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
 use bytes::{BufMut, BytesMut};
 use datafusion::assert_batches_eq;
@@ -29,14 +28,8 @@ use datafusion::datasource::listing::{
 use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::DataFusionError;
-use datafusion_common::{ColumnStatistics, ScalarValue};
-use datafusion_datasource::file::FileSource;
-use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
-use datafusion_datasource::schema_adapter::{
-    DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, 
SchemaMapper,
-};
+use datafusion_common::ScalarValue;
 use datafusion_datasource::ListingTableUrl;
-use datafusion_datasource_parquet::source::ParquetSource;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_physical_expr::expressions::{self, Column};
 use datafusion_physical_expr::PhysicalExpr;
@@ -44,7 +37,6 @@ use datafusion_physical_expr_adapter::{
     DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, 
PhysicalExprAdapter,
     PhysicalExprAdapterFactory,
 };
-use itertools::Itertools;
 use object_store::{memory::InMemory, path::Path, ObjectStore};
 use parquet::arrow::ArrowWriter;
 
@@ -59,98 +51,10 @@ async fn write_parquet(batch: RecordBatch, store: Arc<dyn 
ObjectStore>, path: &s
     store.put(&Path::from(path), data.into()).await.unwrap();
 }
 
-#[derive(Debug)]
-struct CustomSchemaAdapterFactory;
-
-impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
-    fn create(
-        &self,
-        projected_table_schema: SchemaRef,
-        _table_schema: SchemaRef,
-    ) -> Box<dyn SchemaAdapter> {
-        Box::new(CustomSchemaAdapter {
-            logical_file_schema: projected_table_schema,
-        })
-    }
-}
-
-#[derive(Debug)]
-struct CustomSchemaAdapter {
-    logical_file_schema: SchemaRef,
-}
-
-impl SchemaAdapter for CustomSchemaAdapter {
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        for (idx, field) in file_schema.fields().iter().enumerate() {
-            if field.name() == self.logical_file_schema.field(index).name() {
-                return Some(idx);
-            }
-        }
-        None
-    }
-
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let projection = (0..file_schema.fields().len()).collect_vec();
-        Ok((
-            Arc::new(CustomSchemaMapper {
-                logical_file_schema: Arc::clone(&self.logical_file_schema),
-            }),
-            projection,
-        ))
-    }
-}
-
-#[derive(Debug)]
-struct CustomSchemaMapper {
-    logical_file_schema: SchemaRef,
-}
-
-impl SchemaMapper for CustomSchemaMapper {
-    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-        let mut output_columns =
-            Vec::with_capacity(self.logical_file_schema.fields().len());
-        for field in self.logical_file_schema.fields() {
-            if let Some(array) = batch.column_by_name(field.name()) {
-                output_columns.push(cast_with_options(
-                    array,
-                    field.data_type(),
-                    &CastOptions::default(),
-                )?);
-            } else {
-                // Create a new array with the default value for the field type
-                let default_value = match field.data_type() {
-                    DataType::Int64 => ScalarValue::Int64(Some(0)),
-                    DataType::Utf8 => ScalarValue::Utf8(Some("a".to_string())),
-                    _ => unimplemented!("Unsupported data type: {}", 
field.data_type()),
-                };
-                output_columns
-                    
.push(default_value.to_array_of_size(batch.num_rows()).unwrap());
-            }
-        }
-        let batch = RecordBatch::try_new_with_options(
-            Arc::clone(&self.logical_file_schema),
-            output_columns,
-            &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
-        )
-        .unwrap();
-        Ok(batch)
-    }
-
-    fn map_column_statistics(
-        &self,
-        _file_col_statistics: &[ColumnStatistics],
-    ) -> Result<Vec<ColumnStatistics>> {
-        Ok(vec![
-            ColumnStatistics::new_unknown();
-            self.logical_file_schema.fields().len()
-        ])
-    }
-}
-
-// Implement a custom PhysicalExprAdapterFactory that fills in missing columns 
with the default value for the field type
+// Implement a custom PhysicalExprAdapterFactory that fills in missing columns 
with
+// the default value for the field type:
+// - Int64 columns are filled with `1`
+// - Utf8 columns are filled with `'b'`
 #[derive(Debug)]
 struct CustomPhysicalExprAdapterFactory;
 
@@ -264,13 +168,13 @@ async fn 
test_custom_schema_adapter_and_custom_expression_adapter() {
     );
     assert!(!ctx.state().config().collect_statistics());
 
+    // Test with DefaultPhysicalExprAdapterFactory - missing columns are 
filled with NULL
     let listing_table_config =
         ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
             .infer_options(&ctx.state())
             .await
             .unwrap()
             .with_schema(table_schema.clone())
-            .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory))
             
.with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory));
 
     let table = ListingTable::try_new(listing_table_config).unwrap();
@@ -293,36 +197,72 @@ async fn 
test_custom_schema_adapter_and_custom_expression_adapter() {
     ];
     assert_batches_eq!(expected, &batches);
 
-    // Test using a custom schema adapter and no explicit physical expr adapter
-    // This should use the custom schema adapter both for projections and 
predicate pushdown
+    // Test with a custom physical expr adapter
+    // PhysicalExprAdapterFactory now handles both predicates AND projections
+    // CustomPhysicalExprAdapterFactory fills missing columns with 'b' for Utf8
     let listing_table_config =
         ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
             .infer_options(&ctx.state())
             .await
             .unwrap()
             .with_schema(table_schema.clone())
-            .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
+            
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
     let table = ListingTable::try_new(listing_table_config).unwrap();
     ctx.deregister_table("t").unwrap();
     ctx.register_table("t", Arc::new(table)).unwrap();
     let batches = ctx
-        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'")
+        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
         .await
         .unwrap()
         .collect()
         .await
         .unwrap();
+    // With CustomPhysicalExprAdapterFactory, missing column c2 is filled with 
'b'
+    // in both the predicate (c2 = 'b' becomes 'b' = 'b' -> true) and the 
projection
     let expected = [
         "+----+----+",
         "| c2 | c1 |",
         "+----+----+",
-        "| a  | 2  |",
+        "| b  | 2  |",
         "+----+----+",
     ];
     assert_batches_eq!(expected, &batches);
+}
+
+/// Test demonstrating how to implement a custom PhysicalExprAdapterFactory
+/// that fills missing columns with non-null default values.
+///
+/// This is the recommended migration path for users who previously used
+/// SchemaAdapterFactory to fill missing columns with default values.
+/// Instead of transforming batches after reading (SchemaAdapter::map_batch),
+/// the PhysicalExprAdapterFactory rewrites expressions to use literals for
+/// missing columns, achieving the same result more efficiently.
+#[tokio::test]
+async fn test_physical_expr_adapter_with_non_null_defaults() {
+    // File only has c1 column
+    let batch = record_batch!(("c1", Int32, [10, 20, 30])).unwrap();
+
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+    write_parquet(batch, store.clone(), "defaults_test.parquet").await;
+
+    // Table schema has additional columns c2 (Utf8) and c3 (Int64) that don't 
exist in file
+    let table_schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int64, false), // type differs from file 
(Int32 vs Int64)
+        Field::new("c2", DataType::Utf8, true),   // missing from file
+        Field::new("c3", DataType::Int64, true),  // missing from file
+    ]));
+
+    let mut cfg = SessionConfig::new()
+        .with_collect_statistics(false)
+        .with_parquet_pruning(false);
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    let ctx = SessionContext::new_with_config(cfg);
+    ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
 
-    // Do the same test but with a custom physical expr adapter
-    // Now the default schema adapter will be used for projections, but the 
custom physical expr adapter will be used for predicate pushdown
+    // CustomPhysicalExprAdapterFactory fills:
+    // - missing Utf8 columns with 'b'
+    // - missing Int64 columns with 1
     let listing_table_config =
         ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
             .infer_options(&ctx.state())
@@ -330,218 +270,66 @@ async fn 
test_custom_schema_adapter_and_custom_expression_adapter() {
             .unwrap()
             .with_schema(table_schema.clone())
             
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
+
     let table = ListingTable::try_new(listing_table_config).unwrap();
-    ctx.deregister_table("t").unwrap();
     ctx.register_table("t", Arc::new(table)).unwrap();
+
+    // Query all columns - missing columns should have default values
     let batches = ctx
-        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+        .sql("SELECT c1, c2, c3 FROM t ORDER BY c1")
         .await
         .unwrap()
         .collect()
         .await
         .unwrap();
+
+    // c1 is cast from Int32 to Int64, c2 defaults to 'b', c3 defaults to 1
     let expected = [
-        "+----+----+",
-        "| c2 | c1 |",
-        "+----+----+",
-        "|    | 2  |",
-        "+----+----+",
+        "+----+----+----+",
+        "| c1 | c2 | c3 |",
+        "+----+----+----+",
+        "| 10 | b  | 1  |",
+        "| 20 | b  | 1  |",
+        "| 30 | b  | 1  |",
+        "+----+----+----+",
     ];
     assert_batches_eq!(expected, &batches);
 
-    // If we use both then the custom physical expr adapter will be used for 
predicate pushdown and the custom schema adapter will be used for projections
-    let listing_table_config =
-        ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
-            .infer_options(&ctx.state())
-            .await
-            .unwrap()
-            .with_schema(table_schema.clone())
-            .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory))
-            
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
-    let table = ListingTable::try_new(listing_table_config).unwrap();
-    ctx.deregister_table("t").unwrap();
-    ctx.register_table("t", Arc::new(table)).unwrap();
+    // Verify predicates work with default values
+    // c3 = 1 should match all rows since default is 1
     let batches = ctx
-        .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+        .sql("SELECT c1 FROM t WHERE c3 = 1 ORDER BY c1")
         .await
         .unwrap()
         .collect()
         .await
         .unwrap();
+
+    #[rustfmt::skip]
     let expected = [
-        "+----+----+",
-        "| c2 | c1 |",
-        "+----+----+",
-        "| a  | 2  |",
-        "+----+----+",
+        "+----+",
+        "| c1 |",
+        "+----+",
+        "| 10 |",
+        "| 20 |",
+        "| 30 |",
+        "+----+",
     ];
     assert_batches_eq!(expected, &batches);
-}
-
-/// A test schema adapter factory that adds prefix to column names
-#[derive(Debug)]
-struct PrefixAdapterFactory {
-    prefix: String,
-}
-
-impl SchemaAdapterFactory for PrefixAdapterFactory {
-    fn create(
-        &self,
-        projected_table_schema: SchemaRef,
-        _table_schema: SchemaRef,
-    ) -> Box<dyn SchemaAdapter> {
-        Box::new(PrefixAdapter {
-            input_schema: projected_table_schema,
-            prefix: self.prefix.clone(),
-        })
-    }
-}
-
-/// A test schema adapter that adds prefix to column names
-#[derive(Debug)]
-struct PrefixAdapter {
-    input_schema: SchemaRef,
-    prefix: String,
-}
-
-impl SchemaAdapter for PrefixAdapter {
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        let field = self.input_schema.field(index);
-        file_schema.fields.find(field.name()).map(|(i, _)| i)
-    }
-
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let mut projection = Vec::with_capacity(file_schema.fields().len());
-        for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
-            if self.input_schema.fields().find(file_field.name()).is_some() {
-                projection.push(file_idx);
-            }
-        }
-
-        // Create a schema mapper that adds a prefix to column names
-        #[derive(Debug)]
-        struct PrefixSchemaMapping {
-            // Keep only the prefix field which is actually used in the 
implementation
-            prefix: String,
-        }
-
-        impl SchemaMapper for PrefixSchemaMapping {
-            fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-                // Create a new schema with prefixed field names
-                let prefixed_fields: Vec<Field> = batch
-                    .schema()
-                    .fields()
-                    .iter()
-                    .map(|field| {
-                        Field::new(
-                            format!("{}{}", self.prefix, field.name()),
-                            field.data_type().clone(),
-                            field.is_nullable(),
-                        )
-                    })
-                    .collect();
-                let prefixed_schema = Arc::new(Schema::new(prefixed_fields));
-
-                // Create a new batch with the prefixed schema but the same 
data
-                let options = RecordBatchOptions::default();
-                RecordBatch::try_new_with_options(
-                    prefixed_schema,
-                    batch.columns().to_vec(),
-                    &options,
-                )
-                .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
-            }
-
-            fn map_column_statistics(
-                &self,
-                stats: &[ColumnStatistics],
-            ) -> Result<Vec<ColumnStatistics>> {
-                // For testing, just return the input statistics
-                Ok(stats.to_vec())
-            }
-        }
 
-        Ok((
-            Arc::new(PrefixSchemaMapping {
-                prefix: self.prefix.clone(),
-            }),
-            projection,
-        ))
-    }
-}
-
-#[test]
-fn test_apply_schema_adapter_with_factory() {
-    // Create a schema
-    let schema = Arc::new(Schema::new(vec![
-        Field::new("id", DataType::Int32, false),
-        Field::new("name", DataType::Utf8, true),
-    ]));
-
-    // Create a parquet source
-    let source = ParquetSource::new(schema.clone());
-
-    // Create a file scan config with source that has a schema adapter factory
-    let factory = Arc::new(PrefixAdapterFactory {
-        prefix: "test_".to_string(),
-    });
-
-    let file_source = 
source.clone().with_schema_adapter_factory(factory).unwrap();
-
-    let config =
-        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), 
file_source)
-            .build();
-
-    // Apply schema adapter to a new source
-    let result_source = source.apply_schema_adapter(&config).unwrap();
-
-    // Verify the adapter was applied
-    assert!(result_source.schema_adapter_factory().is_some());
-
-    // Create adapter and test it produces expected schema
-    let adapter_factory = result_source.schema_adapter_factory().unwrap();
-    let adapter = adapter_factory.create(schema.clone(), schema.clone());
-
-    // Create a dummy batch to test the schema mapping
-    let dummy_batch = RecordBatch::new_empty(schema.clone());
-
-    // Get the file schema (which is the same as the table schema in this test)
-    let (mapper, _) = adapter.map_schema(&schema).unwrap();
-
-    // Apply the mapping to get the output schema
-    let mapped_batch = mapper.map_batch(dummy_batch).unwrap();
-    let output_schema = mapped_batch.schema();
-
-    // Check the column names have the prefix
-    assert_eq!(output_schema.field(0).name(), "test_id");
-    assert_eq!(output_schema.field(1).name(), "test_name");
-}
-
-#[test]
-fn test_apply_schema_adapter_without_factory() {
-    // Create a schema
-    let schema = Arc::new(Schema::new(vec![
-        Field::new("id", DataType::Int32, false),
-        Field::new("name", DataType::Utf8, true),
-    ]));
-
-    // Create a parquet source
-    let source = ParquetSource::new(schema.clone());
-
-    // Convert to Arc<dyn FileSource>
-    let file_source: Arc<dyn FileSource> = Arc::new(source.clone());
-
-    // Create a file scan config without a schema adapter factory
-    let config =
-        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), 
file_source)
-            .build();
-
-    // Apply schema adapter function - should pass through the source unchanged
-    let result_source = source.apply_schema_adapter(&config).unwrap();
+    // c3 = 999 should match no rows
+    let batches = ctx
+        .sql("SELECT c1 FROM t WHERE c3 = 999")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
 
-    // Verify no adapter was applied
-    assert!(result_source.schema_adapter_factory().is_none());
+    #[rustfmt::skip]
+    let expected = [
+        "++",
+        "++",
+    ];
+    assert_batches_eq!(expected, &batches);
 }
diff --git 
a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs 
b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
index 1915298164..f0d09d7134 100644
--- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
+++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
@@ -18,36 +18,66 @@
 use std::sync::Arc;
 
 use arrow::array::RecordBatch;
-use arrow_schema::{DataType, Field, Schema, SchemaRef};
+
+use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
 use bytes::{BufMut, BytesMut};
 use datafusion::common::Result;
+use datafusion::config::{ConfigOptions, TableParquetOptions};
 use datafusion::datasource::listing::PartitionedFile;
+#[cfg(feature = "parquet")]
+use datafusion::datasource::physical_plan::ParquetSource;
 use datafusion::datasource::physical_plan::{
-    ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource,
+    ArrowSource, CsvSource, FileSource, JsonSource,
 };
+use datafusion::logical_expr::{col, lit};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_common::config::CsvOptions;
-use datafusion_common::ColumnStatistics;
+use datafusion_common::record_batch;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{ColumnStatistics, ScalarValue};
 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
 use datafusion_datasource::schema_adapter::{
     SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
 };
+
+use datafusion::assert_batches_eq;
 use datafusion_datasource::source::DataSourceExec;
 use datafusion_datasource::TableSchema;
 use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_expr::Expr;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::planner::logical2physical;
+use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr_adapter::{PhysicalExprAdapter, 
PhysicalExprAdapterFactory};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use object_store::{memory::InMemory, path::Path, ObjectStore};
 use parquet::arrow::ArrowWriter;
 
 async fn write_parquet(batch: RecordBatch, store: Arc<dyn ObjectStore>, path: 
&str) {
+    write_batches_to_parquet(&[batch], store, path).await;
+}
+
+/// Write RecordBatches to a Parquet file with each batch in its own row group.
+async fn write_batches_to_parquet(
+    batches: &[RecordBatch],
+    store: Arc<dyn ObjectStore>,
+    path: &str,
+) -> usize {
     let mut out = BytesMut::new().writer();
     {
-        let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), 
None).unwrap();
-        writer.write(&batch).unwrap();
+        let mut writer =
+            ArrowWriter::try_new(&mut out, batches[0].schema(), None).unwrap();
+        for batch in batches {
+            writer.write(batch).unwrap();
+            writer.flush().unwrap();
+        }
         writer.finish().unwrap();
     }
     let data = out.into_inner().freeze();
+    let file_size = data.len();
     store.put(&Path::from(path), data.into()).await.unwrap();
+    file_size
 }
 
 /// A schema adapter factory that transforms column names to uppercase
@@ -156,71 +186,414 @@ impl SchemaMapper for UppercaseSchemaMapper {
     }
 }
 
-#[cfg(feature = "parquet")]
-#[tokio::test]
-async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
-    // Create test data
-    let batch = RecordBatch::try_new(
-        Arc::new(Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("name", DataType::Utf8, true),
-        ])),
-        vec![
-            Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
-            Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
-        ],
-    )?;
+/// A physical expression adapter factory that maps uppercase column names to 
lowercase
+#[derive(Debug)]
+struct UppercasePhysicalExprAdapterFactory;
 
-    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
-    let store_url = ObjectStoreUrl::parse("memory://").unwrap();
-    let path = "test.parquet";
-    write_parquet(batch.clone(), store.clone(), path).await;
+impl PhysicalExprAdapterFactory for UppercasePhysicalExprAdapterFactory {
+    fn create(
+        &self,
+        logical_file_schema: SchemaRef,
+        physical_file_schema: SchemaRef,
+    ) -> Arc<dyn PhysicalExprAdapter> {
+        Arc::new(UppercasePhysicalExprAdapter {
+            logical_file_schema,
+            physical_file_schema,
+        })
+    }
+}
 
-    // Get the actual file size from the object store
-    let object_meta = store.head(&Path::from(path)).await?;
-    let file_size = object_meta.size;
+#[derive(Debug)]
+struct UppercasePhysicalExprAdapter {
+    logical_file_schema: SchemaRef,
+    physical_file_schema: SchemaRef,
+}
 
-    // Create a session context and register the object store
-    let ctx = SessionContext::new();
-    ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+impl PhysicalExprAdapter for UppercasePhysicalExprAdapter {
+    fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
+        expr.transform(|e| {
+            if let Some(column) = e.as_any().downcast_ref::<Column>() {
+                // Map uppercase column name (from logical schema) to 
lowercase (in physical file)
+                let lowercase_name = column.name().to_lowercase();
+                if let Ok(idx) = 
self.physical_file_schema.index_of(&lowercase_name) {
+                    return Ok(Transformed::yes(
+                        Arc::new(Column::new(&lowercase_name, idx))
+                            as Arc<dyn PhysicalExpr>,
+                    ));
+                }
+            }
+            Ok(Transformed::no(e))
+        })
+        .data()
+    }
 
-    // Create a table schema with uppercase column names
-    let table_schema = Arc::new(Schema::new(vec![
-        Field::new("ID", DataType::Int32, false),
-        Field::new("NAME", DataType::Utf8, true),
-    ]));
+    fn with_partition_values(
+        &self,
+        _partition_values: Vec<(FieldRef, ScalarValue)>,
+    ) -> Arc<dyn PhysicalExprAdapter> {
+        Arc::new(Self {
+            logical_file_schema: self.logical_file_schema.clone(),
+            physical_file_schema: self.physical_file_schema.clone(),
+        })
+    }
+}
 
-    // Create a ParquetSource with the adapter factory
-    let file_source = ParquetSource::new(table_schema.clone())
-        .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?;
+#[derive(Clone)]
+struct ParquetTestCase {
+    table_schema: TableSchema,
+    batches: Vec<RecordBatch>,
+    predicate: Option<Expr>,
+    projection: Option<ProjectionExprs>,
+    push_down_filters: bool,
+}
 
-    let config = FileScanConfigBuilder::new(store_url, file_source)
-        .with_file(PartitionedFile::new(path, file_size))
-        .build();
+impl ParquetTestCase {
+    fn new(table_schema: TableSchema, batches: Vec<RecordBatch>) -> Self {
+        Self {
+            table_schema,
+            batches,
+            predicate: None,
+            projection: None,
+            push_down_filters: true,
+        }
+    }
 
-    // Create a data source executor
-    let exec = DataSourceExec::from_data_source(config);
+    fn push_down_filters(mut self, pushdown_filters: bool) -> Self {
+        self.push_down_filters = pushdown_filters;
+        self
+    }
 
-    // Collect results
-    let task_ctx = ctx.task_ctx();
-    let stream = exec.execute(0, task_ctx)?;
-    let batches = datafusion::physical_plan::common::collect(stream).await?;
+    fn with_predicate(mut self, predicate: Expr) -> Self {
+        self.predicate = Some(predicate);
+        self
+    }
 
-    // There should be one batch
-    assert_eq!(batches.len(), 1);
+    fn with_projection(mut self, projection: ProjectionExprs) -> Self {
+        self.projection = Some(projection);
+        self
+    }
 
-    // Verify the schema has the uppercase column names
-    let result_schema = batches[0].schema();
-    assert_eq!(result_schema.field(0).name(), "ID");
-    assert_eq!(result_schema.field(1).name(), "NAME");
+    async fn execute(self) -> Result<Vec<RecordBatch>> {
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+        let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+        let path = "test.parquet";
+        let file_size =
+            write_batches_to_parquet(&self.batches, store.clone(), path).await;
+
+        let ctx = SessionContext::new();
+        ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+
+        let mut table_options = TableParquetOptions::default();
+        // controlled via ConfigOptions flag; ParquetSources ORs them so if 
either is true then pushdown is enabled
+        table_options.global.pushdown_filters = false;
+        let mut file_source = Arc::new(
+            ParquetSource::new(self.table_schema.table_schema().clone())
+                .with_table_parquet_options(table_options),
+        ) as Arc<dyn FileSource>;
+
+        if let Some(projection) = self.projection {
+            file_source = 
file_source.try_pushdown_projection(&projection)?.unwrap();
+        }
+
+        if let Some(predicate) = &self.predicate {
+            let filter_expr =
+                logical2physical(predicate, self.table_schema.table_schema());
+            let mut config = ConfigOptions::default();
+            config.execution.parquet.pushdown_filters = self.push_down_filters;
+            let result = file_source.try_pushdown_filters(vec![filter_expr], 
&config)?;
+            file_source = result.updated_node.unwrap();
+        }
+
+        let config = FileScanConfigBuilder::new(store_url.clone(), file_source)
+            .with_file(PartitionedFile::new(path, file_size as u64)) // size 0 
for test
+            .with_expr_adapter(None)
+            .build();
+
+        let exec = DataSourceExec::from_data_source(config);
+        let task_ctx = ctx.task_ctx();
+        let stream = exec.execute(0, task_ctx)?;
+        datafusion::physical_plan::common::collect(stream).await
+    }
+}
+
+/// Test reading and filtering a Parquet file where the table schema is 
flipped (c, b, a) vs. the physical file schema (a, b, c)
+#[tokio::test]
+#[cfg(feature = "parquet")]
+async fn test_parquet_flipped_projection() -> Result<()> {
+    // Create test data with columns (a, b, c) - the file schema
+    let batch1 = record_batch!(
+        ("a", Int32, vec![1, 2]),
+        ("b", Utf8, vec!["x", "y"]),
+        ("c", Float64, vec![1.1, 2.2])
+    )?;
+    let batch2 = record_batch!(
+        ("a", Int32, vec![3]),
+        ("b", Utf8, vec!["z"]),
+        ("c", Float64, vec![3.3])
+    )?;
+
+    // Create a table schema with flipped column order (c, b, a)
+    let table_schema = Arc::new(Schema::new(vec![
+        Field::new("c", DataType::Float64, false),
+        Field::new("b", DataType::Utf8, true),
+        Field::new("a", DataType::Int32, false),
+    ]));
+    let table_schema = TableSchema::from_file_schema(table_schema);
+
+    let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1, 
batch2]);
+
+    // Test reading with flipped schema
+    let batches = test_case.clone().execute().await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+-----+---+---+",
+        "| c   | b | a |",
+        "+-----+---+---+",
+        "| 1.1 | x | 1 |",
+        "| 2.2 | y | 2 |",
+        "| 3.3 | z | 3 |",
+        "+-----+---+---+",
+    ];
+    assert_batches_eq!(expected, &batches);
+
+    // Test with a projection that selects (b, a)
+    let projection = ProjectionExprs::from_indices(&[1, 2], 
table_schema.table_schema());
+    let batches = test_case
+        .clone()
+        .with_projection(projection.clone())
+        .execute()
+        .await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+---+---+",
+        "| b | a |",
+        "+---+---+",
+        "| x | 1 |",
+        "| y | 2 |",
+        "| z | 3 |",
+        "+---+---+",
+    ];
+    assert_batches_eq!(expected, &batches);
+
+    // Test with a filter on b, a
+    // a = 1 or b != 'foo' and a = 3 -> matches [{a=1,b=x},{b=z,a=3}]
+    let filter = col("a")
+        .eq(lit(1))
+        .or(col("b").not_eq(lit("foo")).and(col("a").eq(lit(3))));
+    let batches = test_case
+        .clone()
+        .with_projection(projection.clone())
+        .with_predicate(filter.clone())
+        .execute()
+        .await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+---+---+",
+        "| b | a |",
+        "+---+---+",
+        "| x | 1 |",
+        "| z | 3 |",
+        "+---+---+",
+    ];
+    assert_batches_eq!(expected, &batches);
+
+    // Test with only statistics-based filter pushdown (no row-level filtering)
+    // Since we have 2 row groups and the filter matches rows in both, stats 
pruning alone won't filter any
+    let batches = test_case
+        .clone()
+        .with_projection(projection)
+        .with_predicate(filter)
+        .push_down_filters(false)
+        .execute()
+        .await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+---+---+",
+        "| b | a |",
+        "+---+---+",
+        "| x | 1 |",
+        "| y | 2 |",
+        "| z | 3 |",
+        "+---+---+",
+    ];
+    assert_batches_eq!(expected, &batches);
+
+    // Test with a filter that can prune via statistics: a > 10 (no rows match)
+    let filter = col("a").gt(lit(10));
+    let batches = test_case
+        .clone()
+        .with_predicate(filter)
+        .push_down_filters(false)
+        .execute()
+        .await?;
+    // Stats show a has max=3, so a > 10 prunes all row groups
+    assert_eq!(batches.len(), 0);
+
+    // With a filter that matches only the first row group: a < 3
+    let filter = col("a").lt(lit(3));
+    let batches = test_case
+        .clone()
+        .with_predicate(filter)
+        .push_down_filters(false)
+        .execute()
+        .await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+-----+---+---+",
+        "| c   | b | a |",
+        "+-----+---+---+",
+        "| 1.1 | x | 1 |",
+        "| 2.2 | y | 2 |",
+        "+-----+---+---+",
+    ];
+    assert_batches_eq!(expected, &batches);
 
     Ok(())
 }
 
+/// Test reading a Parquet file that is missing a column specified in the 
table schema, which should get filled in with nulls by default.
+/// We test with the file having columns (a, c) and the table schema having 
(a, b, c)
+#[tokio::test]
 #[cfg(feature = "parquet")]
+async fn test_parquet_missing_column() -> Result<()> {
+    // Create test data with columns (a, c) as 2 batches
+    // | a | c   |
+    // |---|-----|
+    // | 1 | 1.1 |
+    // | 2 | 2.2 |
+    // | ~ | ~~~ |
+    // | 3 | 3.3 |
+    let batch1 = record_batch!(("a", Int32, vec![1, 2]), ("c", Float64, 
vec![1.1, 2.2]))?;
+    let batch2 = record_batch!(("a", Int32, vec![3]), ("c", Float64, 
vec![3.3]))?;
+
+    // Create a table schema with an extra column 'b' (a, b, c)
+    let logical_file_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Int32, false),
+        Field::new("b", DataType::Utf8, true),
+        Field::new("c", DataType::Float64, false),
+    ]));
+    let table_schema = 
TableSchema::from_file_schema(logical_file_schema.clone());
+
+    let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1, 
batch2]);
+
+    let batches = test_case.clone().execute().await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+---+---+-----+",
+        "| a | b | c   |",
+        "+---+---+-----+",
+        "| 1 |   | 1.1 |",
+        "| 2 |   | 2.2 |",
+        "| 3 |   | 3.3 |",
+        "+---+---+-----+",
+    ];
+    assert_batches_eq!(expected, &batches);
+
+    // And with a projection applied that selects (`c, `a`, `b`)
+    let projection =
+        ProjectionExprs::from_indices(&[2, 0, 1], table_schema.table_schema());
+    let batches = test_case
+        .clone()
+        .with_projection(projection)
+        .execute()
+        .await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+-----+---+---+",
+        "| c   | a | b |",
+        "+-----+---+---+",
+        "| 1.1 | 1 |   |",
+        "| 2.2 | 2 |   |",
+        "| 3.3 | 3 |   |",
+        "+-----+---+---+",
+    ];
+    assert_batches_eq!(expected, &batches);
+
+    // And with a filter on a, b
+    // a = 1 or b is null and a = 3
+    let filter = col("a")
+        .eq(lit(1))
+        .or(col("b").is_null().and(col("a").eq(lit(3))));
+    let batches = test_case
+        .clone()
+        .with_predicate(filter.clone())
+        .execute()
+        .await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+---+---+-----+",
+        "| a | b | c   |",
+        "+---+---+-----+",
+        "| 1 |   | 1.1 |",
+        "| 3 |   | 3.3 |",
+        "+---+---+-----+",
+    ];
+    assert_batches_eq!(expected, &batches);
+    // With only statistics-based filter pushdown
+    let batches = test_case
+        .clone()
+        .with_predicate(filter)
+        .push_down_filters(false)
+        .execute()
+        .await?;
+    #[rustfmt::skip]
+    let expected = [
+        "+---+---+-----+",
+        "| a | b | c   |",
+        "+---+---+-----+",
+        "| 1 |   | 1.1 |",
+        "| 2 |   | 2.2 |",
+        "| 3 |   | 3.3 |",
+        "+---+---+-----+",
+    ];
+    assert_batches_eq!(expected, &batches);
+
+    // Filter `b is not null or a = 24` doesn't match any rows
+    let filter = col("b").is_not_null().or(col("a").eq(lit(24)));
+    let batches = test_case
+        .clone()
+        .with_predicate(filter.clone())
+        .execute()
+        .await?;
+    // There should be zero batches
+    assert_eq!(batches.len(), 0);
+    // With only statistics-based filter pushdown
+    let batches = test_case
+        .clone()
+        .with_predicate(filter)
+        .push_down_filters(false)
+        .execute()
+        .await?;
+    // There will be data: the filter is (null) is not null or a = 24.
+    // Statistics pruning doesn't handle `null is not null` so it resolves to 
`true or a = 24` -> `true` so no row groups are pruned
+    #[rustfmt::skip]
+    let expected = [
+        "+---+---+-----+",
+        "| a | b | c   |",
+        "+---+---+-----+",
+        "| 1 |   | 1.1 |",
+        "| 2 |   | 2.2 |",
+        "| 3 |   | 3.3 |",
+        "+---+---+-----+",
+    ];
+    assert_batches_eq!(expected, &batches);
+    // On the other hand the filter `b = 'foo' and a = 24` should prune all 
data even with only statistics-based pushdown
+    let filter = col("b").eq(lit("foo")).and(col("a").eq(lit(24)));
+    let batches = test_case
+        .clone()
+        .with_predicate(filter)
+        .push_down_filters(false)
+        .execute()
+        .await?;
+    // There should be zero batches
+    assert_eq!(batches.len(), 0);
+
+    Ok(())
+}
+
 #[tokio::test]
-async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter(
-) -> Result<()> {
+#[cfg(feature = "parquet")]
+async fn test_parquet_integration_with_physical_expr_adapter() -> Result<()> {
     // Create test data
     let batch = RecordBatch::try_new(
         Arc::new(Schema::new(vec![
@@ -246,12 +619,19 @@ async fn 
test_parquet_integration_with_schema_adapter_and_expression_rewriter(
     let ctx = SessionContext::new();
     ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
 
-    // Create a ParquetSource with the adapter factory
-    let file_source = ParquetSource::new(batch.schema())
-        .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?;
+    // Create a table schema with uppercase column names
+    let table_schema = Arc::new(Schema::new(vec![
+        Field::new("ID", DataType::Int32, false),
+        Field::new("NAME", DataType::Utf8, true),
+    ]));
+
+    // Create a ParquetSource with the table schema (uppercase columns)
+    let file_source = Arc::new(ParquetSource::new(table_schema.clone()));
 
+    // Use PhysicalExprAdapterFactory to map uppercase column names to 
lowercase
     let config = FileScanConfigBuilder::new(store_url, file_source)
         .with_file(PartitionedFile::new(path, file_size))
+        .with_expr_adapter(Some(Arc::new(UppercasePhysicalExprAdapterFactory)))
         .build();
 
     // Create a data source executor
@@ -265,10 +645,28 @@ async fn 
test_parquet_integration_with_schema_adapter_and_expression_rewriter(
     // There should be one batch
     assert_eq!(batches.len(), 1);
 
-    // Verify the schema has the original column names (schema adapter not 
applied in DataSourceExec)
+    // Verify the schema has the uppercase column names
     let result_schema = batches[0].schema();
-    assert_eq!(result_schema.field(0).name(), "id");
-    assert_eq!(result_schema.field(1).name(), "name");
+    assert_eq!(result_schema.field(0).name(), "ID");
+    assert_eq!(result_schema.field(1).name(), "NAME");
+
+    // Verify the data was correctly read from the lowercase file columns
+    // This confirms the PhysicalExprAdapter successfully mapped uppercase -> 
lowercase
+    let id_array = batches[0]
+        .column(0)
+        .as_any()
+        .downcast_ref::<arrow::array::Int32Array>()
+        .expect("Expected Int32Array for ID column");
+    assert_eq!(id_array.values(), &[1, 2, 3]);
+
+    let name_array = batches[0]
+        .column(1)
+        .as_any()
+        .downcast_ref::<arrow::array::StringArray>()
+        .expect("Expected StringArray for NAME column");
+    assert_eq!(name_array.value(0), "a");
+    assert_eq!(name_array.value(1), "b");
+    assert_eq!(name_array.value(2), "c");
 
     Ok(())
 }
@@ -306,28 +704,6 @@ async fn test_multi_source_schema_adapter_reuse() -> 
Result<()> {
         );
     }
 
-    // Test ParquetSource
-    #[cfg(feature = "parquet")]
-    {
-        let schema =
-            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, 
false)]));
-        let source = ParquetSource::new(schema);
-        let source_with_adapter = source
-            .clone()
-            .with_schema_adapter_factory(factory.clone())
-            .unwrap();
-
-        let base_source: Arc<dyn FileSource> = source.into();
-        assert!(base_source.schema_adapter_factory().is_none());
-        assert!(source_with_adapter.schema_adapter_factory().is_some());
-
-        let retrieved_factory = 
source_with_adapter.schema_adapter_factory().unwrap();
-        assert_eq!(
-            format!("{:?}", retrieved_factory.as_ref()),
-            format!("{:?}", factory.as_ref())
-        );
-    }
-
     // Test CsvSource
     {
         let schema =
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index a2ce16cd53..9cc061cc45 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -483,11 +483,8 @@ impl FileFormat for ParquetFormat {
 
         source = self.set_source_encryption_factory(source, state)?;
 
-        // Apply schema adapter factory before building the new config
-        let file_source = source.apply_schema_adapter(&conf)?;
-
         let conf = FileScanConfigBuilder::from(conf)
-            .with_source(file_source)
+            .with_source(Arc::new(source))
             .build();
         Ok(DataSourceExec::from_data_source(conf))
     }
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index fd2907b86b..08534f8076 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -25,7 +25,8 @@ use crate::{
 };
 use arrow::array::RecordBatch;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
-use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr::utils::reassign_expr_columns;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
@@ -97,8 +98,6 @@ pub(super) struct ParquetOpener {
     /// Should the bloom filter be read from parquet, if present, to skip row
     /// groups
     pub enable_bloom_filter: bool,
-    /// Schema adapter factory
-    pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
     /// Should row group pruning be applied
     pub enable_row_group_stats_pruning: bool,
     /// Coerce INT96 timestamps to specific TimeUnit
@@ -140,12 +139,7 @@ impl FileOpener for ParquetOpener {
 
         let batch_size = self.batch_size;
 
-        let projected_schema =
-            
SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
-        let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
-        let schema_adapter = self
-            .schema_adapter_factory
-            .create(projected_schema, Arc::clone(&self.logical_file_schema));
+        let projection = Arc::clone(&self.projection);
         let mut predicate = self.predicate.clone();
         let logical_file_schema = Arc::clone(&self.logical_file_schema);
         let partition_fields = self.partition_fields.clone();
@@ -269,7 +263,7 @@ impl FileOpener for ParquetOpener {
 
             // Adapt the predicate to the physical file schema.
             // This evaluates missing columns and inserts any necessary casts.
-            if let Some(expr_adapter_factory) = expr_adapter_factory {
+            if let Some(expr_adapter_factory) = expr_adapter_factory.as_ref() {
                 predicate = predicate
                     .map(|p| {
                         let partition_values = partition_fields
@@ -320,13 +314,26 @@ impl FileOpener for ParquetOpener {
                 reader_metadata,
             );
 
-            let (schema_mapping, adapted_projections) =
-                schema_adapter.map_schema(&physical_file_schema)?;
+            let mut projection =
+                ProjectionExprs::from_indices(&projection, 
&logical_file_schema);
+            if let Some(expr_adapter_factory) = expr_adapter_factory {
+                let adapter = expr_adapter_factory
+                    .create(
+                        Arc::clone(&logical_file_schema),
+                        Arc::clone(&physical_file_schema),
+                    )
+                    .with_partition_values(
+                        partition_fields
+                            .iter()
+                            .cloned()
+                            .zip(partitioned_file.partition_values.clone())
+                            .collect_vec(),
+                    );
+                projection = projection.try_map_exprs(|expr| 
adapter.rewrite(expr))?;
+            }
+            let indices = projection.column_indices();
 
-            let mask = ProjectionMask::roots(
-                builder.parquet_schema(),
-                adapted_projections.iter().cloned(),
-            );
+            let mask = ProjectionMask::roots(builder.parquet_schema(), 
indices);
 
             // Filter pushdown: evaluate predicates during scan
             if let Some(predicate) = 
pushdown_filters.then_some(predicate).flatten() {
@@ -337,7 +344,6 @@ impl FileOpener for ParquetOpener {
                     builder.metadata(),
                     reorder_predicates,
                     &file_metrics,
-                    &schema_adapter_factory,
                 );
 
                 match row_filter {
@@ -464,6 +470,16 @@ impl FileOpener for ParquetOpener {
                 file_metrics.predicate_cache_inner_records.clone();
             let predicate_cache_records = 
file_metrics.predicate_cache_records.clone();
 
+            let stream_schema = Arc::clone(stream.schema());
+
+            // Rebase column indices to match the narrowed stream schema.
+            // The projection expressions have indices based on 
physical_file_schema,
+            // but the stream only contains the columns selected by the 
ProjectionMask.
+            let projection = projection
+                .try_map_exprs(|expr| reassign_expr_columns(expr, 
&stream_schema))?;
+
+            let projector = projection.make_projector(&stream_schema)?;
+
             let stream = stream.map_err(DataFusionError::from).map(move |b| {
                 b.and_then(|b| {
                     copy_arrow_reader_metrics(
@@ -471,7 +487,7 @@ impl FileOpener for ParquetOpener {
                         &predicate_cache_inner_records,
                         &predicate_cache_records,
                     );
-                    schema_mapping.map_batch(b)
+                    projector.project_batch(&b)
                 })
             });
 
@@ -764,29 +780,19 @@ fn should_enable_page_index(
 mod test {
     use std::sync::Arc;
 
-    use arrow::{
-        compute::cast,
-        datatypes::{DataType, Field, Schema, SchemaRef},
-    };
+    use arrow::datatypes::{DataType, Field, Schema};
     use bytes::{BufMut, BytesMut};
     use datafusion_common::{
-        assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
-        DataFusionError, ScalarValue, Statistics,
-    };
-    use datafusion_datasource::{
-        file_stream::FileOpener,
-        schema_adapter::{
-            DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
-            SchemaMapper,
-        },
-        PartitionedFile,
+        record_batch, stats::Precision, ColumnStatistics, DataFusionError, 
ScalarValue,
+        Statistics,
     };
+    use datafusion_datasource::{file_stream::FileOpener, PartitionedFile};
     use datafusion_expr::{col, lit};
     use datafusion_physical_expr::{
         expressions::DynamicFilterPhysicalExpr, planner::logical2physical, 
PhysicalExpr,
     };
     use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
-    use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, 
MetricsSet};
+    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
     use futures::{Stream, StreamExt};
     use object_store::{memory::InMemory, path::Path, ObjectStore};
     use parquet::arrow::ArrowWriter;
@@ -810,21 +816,6 @@ mod test {
         (num_batches, num_rows)
     }
 
-    async fn collect_batches(
-        mut stream: std::pin::Pin<
-            Box<
-                dyn Stream<Item = Result<arrow::array::RecordBatch, 
DataFusionError>>
-                    + Send,
-            >,
-        >,
-    ) -> Vec<arrow::array::RecordBatch> {
-        let mut batches = vec![];
-        while let Some(Ok(batch)) = stream.next().await {
-            batches.push(batch);
-        }
-        batches
-    }
-
     async fn write_parquet(
         store: Arc<dyn ObjectStore>,
         filename: &str,
@@ -898,7 +889,6 @@ mod test {
                 force_filter_selections: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
                 #[cfg(feature = "parquet_encryption")]
@@ -972,7 +962,6 @@ mod test {
                 force_filter_selections: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
                 #[cfg(feature = "parquet_encryption")]
@@ -1062,7 +1051,6 @@ mod test {
                 force_filter_selections: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
                 #[cfg(feature = "parquet_encryption")]
@@ -1155,7 +1143,6 @@ mod test {
                 force_filter_selections: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: false, // note that this is 
false!
                 coerce_int96: None,
                 #[cfg(feature = "parquet_encryption")]
@@ -1248,7 +1235,6 @@ mod test {
                 force_filter_selections: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
                 #[cfg(feature = "parquet_encryption")]
@@ -1277,155 +1263,4 @@ mod test {
         assert_eq!(num_batches, 0);
         assert_eq!(num_rows, 0);
     }
-
-    fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
-        match metrics.sum_by_name(metric_name) {
-            Some(v) => v.as_usize(),
-            _ => {
-                panic!(
-                    "Expected metric not found. Looking for '{metric_name}' 
in\n\n{metrics:#?}"
-                );
-            }
-        }
-    }
-
-    #[tokio::test]
-    async fn test_custom_schema_adapter_no_rewriter() {
-        // Make a hardcoded schema adapter that adds a new column "b" with 
default value 0.0
-        // and converts the first column "a" from Int32 to UInt64.
-        #[derive(Debug, Clone)]
-        struct CustomSchemaMapper;
-
-        impl SchemaMapper for CustomSchemaMapper {
-            fn map_batch(
-                &self,
-                batch: arrow::array::RecordBatch,
-            ) -> datafusion_common::Result<arrow::array::RecordBatch> {
-                let a_column = cast(batch.column(0), &DataType::UInt64)?;
-                // Add in a new column "b" with default value 0.0
-                let b_column =
-                    arrow::array::Float64Array::from(vec![Some(0.0); 
batch.num_rows()]);
-                let columns = vec![a_column, Arc::new(b_column)];
-                let new_schema = Arc::new(Schema::new(vec![
-                    Field::new("a", DataType::UInt64, false),
-                    Field::new("b", DataType::Float64, false),
-                ]));
-                Ok(arrow::record_batch::RecordBatch::try_new(
-                    new_schema, columns,
-                )?)
-            }
-
-            fn map_column_statistics(
-                &self,
-                file_col_statistics: &[ColumnStatistics],
-            ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
-                Ok(vec![
-                    file_col_statistics[0].clone(),
-                    ColumnStatistics::new_unknown(),
-                ])
-            }
-        }
-
-        #[derive(Debug, Clone)]
-        struct CustomSchemaAdapter;
-
-        impl SchemaAdapter for CustomSchemaAdapter {
-            fn map_schema(
-                &self,
-                _file_schema: &Schema,
-            ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>
-            {
-                let mapper = Arc::new(CustomSchemaMapper);
-                let projection = vec![0]; // We only need to read the first 
column "a" from the file
-                Ok((mapper, projection))
-            }
-
-            fn map_column_index(
-                &self,
-                index: usize,
-                file_schema: &Schema,
-            ) -> Option<usize> {
-                if index < file_schema.fields().len() {
-                    Some(index)
-                } else {
-                    None // The new column "b" is not in the original schema
-                }
-            }
-        }
-
-        #[derive(Debug, Clone)]
-        struct CustomSchemaAdapterFactory;
-
-        impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
-            fn create(
-                &self,
-                _projected_table_schema: SchemaRef,
-                _table_schema: SchemaRef,
-            ) -> Box<dyn SchemaAdapter> {
-                Box::new(CustomSchemaAdapter)
-            }
-        }
-
-        // Test that if no expression rewriter is provided we use a 
schemaadapter to adapt the data to the expression
-        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
-        let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(3)])).unwrap();
-        // Write out the batch to a Parquet file
-        let data_size =
-            write_parquet(Arc::clone(&store), "test.parquet", 
batch.clone()).await;
-        let file = PartitionedFile::new(
-            "test.parquet".to_string(),
-            u64::try_from(data_size).unwrap(),
-        );
-        let table_schema = Arc::new(Schema::new(vec![
-            Field::new("a", DataType::UInt64, false),
-            Field::new("b", DataType::Float64, false),
-        ]));
-
-        let make_opener = |predicate| ParquetOpener {
-            partition_index: 0,
-            projection: Arc::new([0, 1]),
-            batch_size: 1024,
-            limit: None,
-            predicate: Some(predicate),
-            logical_file_schema: Arc::clone(&table_schema),
-            metadata_size_hint: None,
-            metrics: ExecutionPlanMetricsSet::new(),
-            parquet_file_reader_factory: 
Arc::new(DefaultParquetFileReaderFactory::new(
-                Arc::clone(&store),
-            )),
-            partition_fields: vec![],
-            pushdown_filters: true,
-            reorder_filters: false,
-            force_filter_selections: false,
-            enable_page_index: false,
-            enable_bloom_filter: false,
-            schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
-            enable_row_group_stats_pruning: false,
-            coerce_int96: None,
-            #[cfg(feature = "parquet_encryption")]
-            file_decryption_properties: None,
-            expr_adapter_factory: None,
-            #[cfg(feature = "parquet_encryption")]
-            encryption_factory: None,
-            max_predicate_cache_size: None,
-        };
-
-        let predicate = logical2physical(&col("a").eq(lit(1u64)), 
&table_schema);
-        let opener = make_opener(predicate);
-        let stream = opener.open(file.clone()).unwrap().await.unwrap();
-        let batches = collect_batches(stream).await;
-
-        #[rustfmt::skip]
-        let expected = [
-            "+---+-----+",
-            "| a | b   |",
-            "+---+-----+",
-            "| 1 | 0.0 |",
-            "+---+-----+",
-        ];
-        assert_batches_eq!(expected, &batches);
-        let metrics = opener.metrics.clone_inner();
-        assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
-        assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2);
-    }
 }
diff --git a/datafusion/datasource-parquet/src/row_filter.rs 
b/datafusion/datasource-parquet/src/row_filter.rs
index de9fe181f8..059663c231 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -73,8 +73,7 @@ use parquet::file::metadata::ParquetMetaData;
 
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, 
TreeNodeVisitor};
-use datafusion_common::Result;
-use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, 
SchemaMapper};
+use datafusion_common::{internal_datafusion_err, Result};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::utils::reassign_expr_columns;
 use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
@@ -106,8 +105,6 @@ pub(crate) struct DatafusionArrowPredicate {
     rows_matched: metrics::Count,
     /// how long was spent evaluating this predicate
     time: metrics::Time,
-    /// used to perform type coercion while filtering rows
-    schema_mapper: Arc<dyn SchemaMapper>,
 }
 
 impl DatafusionArrowPredicate {
@@ -131,7 +128,6 @@ impl DatafusionArrowPredicate {
             rows_pruned,
             rows_matched,
             time,
-            schema_mapper: candidate.schema_mapper,
         })
     }
 }
@@ -142,8 +138,6 @@ impl ArrowPredicate for DatafusionArrowPredicate {
     }
 
     fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
-        let batch = self.schema_mapper.map_batch(batch)?;
-
         // scoped timer updates on drop
         let mut timer = self.time.timer();
 
@@ -183,12 +177,8 @@ pub(crate) struct FilterCandidate {
     /// Can this filter use an index (e.g. a page index) to prune rows?
     can_use_index: bool,
     /// The projection to read from the file schema to get the columns
-    /// required to pass through a `SchemaMapper` to the table schema
-    /// upon which we then evaluate the filter expression.
+    /// required to evaluate the filter expression.
     projection: Vec<usize>,
-    ///  A `SchemaMapper` used to map batches read from the file schema to
-    /// the filter's projection of the table schema.
-    schema_mapper: Arc<dyn SchemaMapper>,
     /// The projected table schema that this filter references
     filter_schema: SchemaRef,
 }
@@ -198,42 +188,17 @@ pub(crate) struct FilterCandidate {
 /// This will do several things
 /// 1. Determine the columns required to evaluate the expression
 /// 2. Calculate data required to estimate the cost of evaluating the filter
-/// 3. Rewrite column expressions in the predicate which reference columns not
-///    in the particular file schema.
-///
-/// # Schema Rewrite
-///
-/// When parquet files are read in the context of "schema evolution" there are
-/// potentially wo schemas:
-///
-/// 1. The table schema (the columns of the table that the parquet file is 
part of)
-/// 2. The file schema (the columns actually in the parquet file)
 ///
-/// There are times when the table schema contains columns that are not in the
-/// file schema, such as when new columns have been added in new parquet files
-/// but old files do not have the columns.
-///
-/// When a file is missing a column from the table schema, the value of the
-/// missing column is filled in by a `SchemaAdapter` (by default as `NULL`).
-///
-/// When a predicate is pushed down to the parquet reader, the predicate is
-/// evaluated in the context of the file schema.
-/// For each predicate we build a filter schema which is the projection of the 
table
-/// schema that contains only the columns that this filter references.
-/// If any columns from the file schema are missing from a particular file 
they are
-/// added by the `SchemaAdapter`, by default as `NULL`.
+/// Note that this does *not* handle any adaptation of the data schema to the 
expression schema,
+/// it is assumed that the expression has already been adapted to the file 
schema before being passed in here,
+/// generally using 
[`PhysicalExprAdapter`](datafusion_physical_expr_adapter::PhysicalExprAdapter).
 struct FilterCandidateBuilder {
     expr: Arc<dyn PhysicalExpr>,
     /// The schema of this parquet file.
-    /// Columns may have different types from the table schema and there may be
-    /// columns in the file schema that are not in the table schema or columns 
that
-    /// are in the table schema that are not in the file schema.
     file_schema: SchemaRef,
     /// The schema of the table (merged schema) -- columns may be in different
     /// order than in the file and have columns that are not in the file schema
     table_schema: SchemaRef,
-    /// A `SchemaAdapterFactory` used to map the file schema to the table 
schema.
-    schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
 }
 
 impl FilterCandidateBuilder {
@@ -241,13 +206,11 @@ impl FilterCandidateBuilder {
         expr: Arc<dyn PhysicalExpr>,
         file_schema: Arc<Schema>,
         table_schema: Arc<Schema>,
-        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
     ) -> Self {
         Self {
             expr,
             file_schema,
             table_schema,
-            schema_adapter_factory,
         }
     }
 
@@ -270,10 +233,24 @@ impl FilterCandidateBuilder {
                 .project(&required_indices_into_table_schema)?,
         );
 
-        let (schema_mapper, projection_into_file_schema) = self
-            .schema_adapter_factory
-            .create(Arc::clone(&projected_table_schema), self.table_schema)
-            .map_schema(&self.file_schema)?;
+        // Compute the projection into the file schema by matching column names
+        let mut projection_into_file_schema: Vec<usize> = 
projected_table_schema
+            .fields()
+            .iter()
+            .filter_map(|f| self.file_schema.index_of(f.name()).ok())
+            .collect();
+        // Sort and remove duplicates
+        let original_len = projection_into_file_schema.len();
+        projection_into_file_schema.sort_unstable();
+        projection_into_file_schema.dedup();
+        if projection_into_file_schema.len() < original_len {
+            // This should not happen, as we built projected_table_schema from
+            // the table schema which should not have duplicate column names.
+            return Err(internal_datafusion_err!(
+                "Duplicate column names found when building filter candidate: 
{:?}",
+                projection_into_file_schema
+            ));
+        }
 
         let required_bytes = size_of_columns(&projection_into_file_schema, 
metadata)?;
         let can_use_index = columns_sorted(&projection_into_file_schema, 
metadata)?;
@@ -283,7 +260,6 @@ impl FilterCandidateBuilder {
             required_bytes,
             can_use_index,
             projection: projection_into_file_schema,
-            schema_mapper: Arc::clone(&schema_mapper),
             filter_schema: Arc::clone(&projected_table_schema),
         }))
     }
@@ -429,7 +405,6 @@ pub fn build_row_filter(
     metadata: &ParquetMetaData,
     reorder_predicates: bool,
     file_metrics: &ParquetFileMetrics,
-    schema_adapter_factory: &Arc<dyn SchemaAdapterFactory>,
 ) -> Result<Option<RowFilter>> {
     let rows_pruned = &file_metrics.pushdown_rows_pruned;
     let rows_matched = &file_metrics.pushdown_rows_matched;
@@ -447,7 +422,6 @@ pub fn build_row_filter(
                 Arc::clone(expr),
                 Arc::clone(physical_file_schema),
                 Arc::clone(predicate_file_schema),
-                Arc::clone(schema_adapter_factory),
             )
             .build(metadata)
         })
@@ -511,9 +485,11 @@ mod test {
     use datafusion_common::ScalarValue;
 
     use arrow::datatypes::{Field, TimeUnit::Nanosecond};
-    use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
     use datafusion_expr::{col, Expr};
     use datafusion_physical_expr::planner::logical2physical;
+    use datafusion_physical_expr_adapter::{
+        DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
+    };
     use datafusion_physical_plan::metrics::{Count, Time};
 
     use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -538,17 +514,12 @@ mod test {
         let expr = col("int64_list").is_not_null();
         let expr = logical2physical(&expr, &table_schema);
 
-        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
         let table_schema = Arc::new(table_schema.clone());
 
-        let candidate = FilterCandidateBuilder::new(
-            expr,
-            table_schema.clone(),
-            table_schema,
-            schema_adapter_factory,
-        )
-        .build(metadata)
-        .expect("building candidate");
+        let candidate =
+            FilterCandidateBuilder::new(expr, table_schema.clone(), 
table_schema)
+                .build(metadata)
+                .expect("building candidate");
 
         assert!(candidate.is_none());
     }
@@ -578,17 +549,16 @@ mod test {
             None,
         ));
         let expr = logical2physical(&expr, &table_schema);
-        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+        let expr = DefaultPhysicalExprAdapterFactory {}
+            .create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
+            .rewrite(expr)
+            .expect("rewriting expression");
         let table_schema = Arc::new(table_schema.clone());
-        let candidate = FilterCandidateBuilder::new(
-            expr,
-            file_schema.clone(),
-            table_schema.clone(),
-            schema_adapter_factory,
-        )
-        .build(&metadata)
-        .expect("building candidate")
-        .expect("candidate expected");
+        let candidate =
+            FilterCandidateBuilder::new(expr, file_schema.clone(), 
table_schema.clone())
+                .build(&metadata)
+                .expect("building candidate")
+                .expect("candidate expected");
 
         let mut row_filter = DatafusionArrowPredicate::try_new(
             candidate,
@@ -619,16 +589,15 @@ mod test {
             None,
         ));
         let expr = logical2physical(&expr, &table_schema);
-        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
-        let candidate = FilterCandidateBuilder::new(
-            expr,
-            file_schema,
-            table_schema,
-            schema_adapter_factory,
-        )
-        .build(&metadata)
-        .expect("building candidate")
-        .expect("candidate expected");
+        // Rewrite the expression to add CastExpr for type coercion
+        let expr = DefaultPhysicalExprAdapterFactory {}
+            .create(table_schema.clone(), Arc::clone(&file_schema))
+            .rewrite(expr)
+            .expect("rewriting expression");
+        let candidate = FilterCandidateBuilder::new(expr, file_schema, 
table_schema)
+            .build(&metadata)
+            .expect("building candidate")
+            .expect("candidate expected");
 
         let mut row_filter = DatafusionArrowPredicate::try_new(
             candidate,
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 940cf32901..1abe126369 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -32,9 +32,6 @@ use datafusion_common::config::EncryptionFactoryOptions;
 use datafusion_datasource::as_file_source;
 use datafusion_datasource::file_stream::FileOpener;
 use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
-use datafusion_datasource::schema_adapter::{
-    DefaultSchemaAdapterFactory, SchemaAdapterFactory,
-};
 
 use arrow::datatypes::TimeUnit;
 use datafusion_common::config::TableParquetOptions;
@@ -135,7 +132,7 @@ use parquet::encryption::decrypt::FileDecryptionProperties;
 ///   details.
 ///
 /// * Schema evolution: read parquet files with different schemas into a 
unified
-///   table schema. See [`SchemaAdapterFactory`] for more details.
+///   table schema. See [`DefaultPhysicalExprAdapterFactory`] for more details.
 ///
 /// * metadata_size_hint: controls the number of bytes read from the end of the
 ///   file in the initial I/O when the default [`ParquetFileReaderFactory`]. 
If a
@@ -262,12 +259,13 @@ use 
parquet::encryption::decrypt::FileDecryptionProperties;
 ///   [`Self::with_pushdown_filters`]).
 ///
 /// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a
-///   [`SchemaAdapter`] to match the table schema. By default missing columns 
are
-///   filled with nulls, but this can be customized via 
[`SchemaAdapterFactory`].
+///   [`DefaultPhysicalExprAdapterFactory`] to match the table schema. By 
default missing columns are
+///   filled with nulls, but this can be customized via 
[`PhysicalExprAdapterFactory`].
 ///
 /// [`RecordBatch`]: arrow::record_batch::RecordBatch
 /// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter
 /// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
+/// [`PhysicalExprAdapterFactory`]: 
datafusion_physical_expr_adapter::PhysicalExprAdapterFactory
 #[derive(Clone, Debug)]
 pub struct ParquetSource {
     /// Options for reading Parquet files
@@ -282,8 +280,6 @@ pub struct ParquetSource {
     pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
     /// Optional user defined parquet file reader factory
     pub(crate) parquet_file_reader_factory: Option<Arc<dyn 
ParquetFileReaderFactory>>,
-    /// Optional user defined schema adapter
-    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
     /// Batch size configuration
     pub(crate) batch_size: Option<usize>,
     /// Optional hint for the size of the parquet metadata
@@ -309,7 +305,6 @@ impl ParquetSource {
             metrics: ExecutionPlanMetricsSet::new(),
             predicate: None,
             parquet_file_reader_factory: None,
-            schema_adapter_factory: None,
             batch_size: None,
             metadata_size_hint: None,
             #[cfg(feature = "parquet_encryption")]
@@ -456,28 +451,6 @@ impl ParquetSource {
         self.table_parquet_options.global.max_predicate_cache_size
     }
 
-    /// Applies schema adapter factory from the FileScanConfig if present.
-    ///
-    /// # Arguments
-    /// * `conf` - FileScanConfig that may contain a schema adapter factory
-    /// # Returns
-    /// The converted FileSource with schema adapter factory applied if 
provided
-    pub fn apply_schema_adapter(
-        self,
-        conf: &FileScanConfig,
-    ) -> datafusion_common::Result<Arc<dyn FileSource>> {
-        let file_source: Arc<dyn FileSource> = self.into();
-
-        // If the FileScanConfig.file_source() has a schema adapter factory, 
apply it
-        if let Some(factory) = conf.file_source().schema_adapter_factory() {
-            file_source.with_schema_adapter_factory(
-                Arc::<dyn SchemaAdapterFactory>::clone(&factory),
-            )
-        } else {
-            Ok(file_source)
-        }
-    }
-
     #[cfg(feature = "parquet_encryption")]
     fn get_encryption_factory_with_config(
         &self,
@@ -526,43 +499,10 @@ impl FileSource for ParquetSource {
     ) -> datafusion_common::Result<Arc<dyn FileOpener>> {
         let split_projection = self.projection.clone();
 
-        let (expr_adapter_factory, schema_adapter_factory) = match (
-            base_config.expr_adapter_factory.as_ref(),
-            self.schema_adapter_factory.as_ref(),
-        ) {
-            (Some(expr_adapter_factory), Some(schema_adapter_factory)) => {
-                // Use both the schema adapter factory and the expr adapter 
factory.
-                // This results in the SchemaAdapter being used for 
projections (e.g. a column was selected that is a UInt32 in the file and a 
UInt64 in the table schema)
-                // but the PhysicalExprAdapterFactory being used for predicate 
pushdown and stats pruning.
-                (
-                    Some(Arc::clone(expr_adapter_factory)),
-                    Arc::clone(schema_adapter_factory),
-                )
-            }
-            (Some(expr_adapter_factory), None) => {
-                // If no custom schema adapter factory is provided but an expr 
adapter factory is provided use the expr adapter factory alongside the default 
schema adapter factory.
-                // This means that the PhysicalExprAdapterFactory will be used 
for predicate pushdown and stats pruning, while the default schema adapter 
factory will be used for projections.
-                (
-                    Some(Arc::clone(expr_adapter_factory)),
-                    Arc::new(DefaultSchemaAdapterFactory) as _,
-                )
-            }
-            (None, Some(schema_adapter_factory)) => {
-                // If a custom schema adapter factory is provided but no expr 
adapter factory is provided use the custom SchemaAdapter for both projections 
and predicate pushdown.
-                // This maximizes compatibility with existing code that uses 
the SchemaAdapter API and did not explicitly opt into the 
PhysicalExprAdapterFactory API.
-                (None, Arc::clone(schema_adapter_factory) as _)
-            }
-            (None, None) => {
-                // If no custom schema adapter factory or expr adapter factory 
is provided, use the default schema adapter factory and the default physical 
expr adapter factory.
-                // This means that the default SchemaAdapter will be used for 
projections (e.g. a column was selected that is a UInt32 in the file and a 
UInt64 in the table schema)
-                // and the default PhysicalExprAdapterFactory will be used for 
predicate pushdown and stats pruning.
-                // This is the default behavior with not customization and 
means that most users of DataFusion will be cut over to the new 
PhysicalExprAdapterFactory API.
-                (
-                    Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
-                    Arc::new(DefaultSchemaAdapterFactory) as _,
-                )
-            }
-        };
+        let expr_adapter_factory = base_config
+            .expr_adapter_factory
+            .clone()
+            .or_else(|| Some(Arc::new(DefaultPhysicalExprAdapterFactory) as 
_));
 
         let parquet_file_reader_factory =
             self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
@@ -604,7 +544,6 @@ impl FileSource for ParquetSource {
             enable_page_index: self.enable_page_index(),
             enable_bloom_filter: self.bloom_filter_on_read(),
             enable_row_group_stats_pruning: 
self.table_parquet_options.global.pruning,
-            schema_adapter_factory,
             coerce_int96,
             #[cfg(feature = "parquet_encryption")]
             file_decryption_properties,
@@ -780,20 +719,6 @@ impl FileSource for ParquetSource {
         )
         .with_updated_node(source))
     }
-
-    fn with_schema_adapter_factory(
-        &self,
-        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
-    ) -> datafusion_common::Result<Arc<dyn FileSource>> {
-        Ok(Arc::new(Self {
-            schema_adapter_factory: Some(schema_adapter_factory),
-            ..self.clone()
-        }))
-    }
-
-    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
-        self.schema_adapter_factory.clone()
-    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs 
b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
index 61cc97dae3..daa4e6203c 100644
--- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
+++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
@@ -23,12 +23,14 @@ use arrow::compute::can_cast_types;
 use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
 use datafusion_common::{
     exec_err,
+    nested_struct::validate_struct_compatibility,
     tree_node::{Transformed, TransformedResult, TreeNode},
     Result, ScalarValue,
 };
 use datafusion_functions::core::getfield::GetFieldFunc;
+use datafusion_physical_expr::expressions::CastColumnExpr;
 use datafusion_physical_expr::{
-    expressions::{self, CastExpr, Column},
+    expressions::{self, Column},
     ScalarFunctionExpr,
 };
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -380,10 +382,8 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
                         column.name()
                     );
                     }
-                    // If the column is missing from the physical schema fill 
it in with nulls as `SchemaAdapter` would do.
-                    // TODO: do we need to sync this with what the 
`SchemaAdapter` actually does?
-                    // While the default implementation fills in nulls in 
theory a custom `SchemaAdapter` could do something else!
-                    // See https://github.com/apache/datafusion/issues/16527
+                    // If the column is missing from the physical schema fill 
it in with nulls as `SchemaAdapter` used to do.
+                    // If users want a different behavior they need to provide 
a custom `PhysicalExprAdapter` implementation.
                     let null_value =
                         ScalarValue::Null.cast_to(logical_field.data_type())?;
                     return Ok(Transformed::yes(expressions::lit(null_value)));
@@ -413,20 +413,34 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
         // TODO: add optimization to move the cast from the column to literal 
expressions in the case of `col = 123`
         // since that's much cheaper to evalaute.
         // See 
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
-        let is_compatible =
-            can_cast_types(physical_field.data_type(), 
logical_field.data_type());
-        if !is_compatible {
-            return exec_err!(
-                "Cannot cast column '{}' from '{}' (physical data type) to 
'{}' (logical data type)",
-                column.name(),
-                physical_field.data_type(),
-                logical_field.data_type()
-            );
+        //
+        // For struct types, use validate_struct_compatibility which handles:
+        // - Missing fields in source (filled with nulls)
+        // - Extra fields in source (ignored)
+        // - Recursive validation of nested structs
+        // For non-struct types, use Arrow's can_cast_types
+        match (physical_field.data_type(), logical_field.data_type()) {
+            (DataType::Struct(physical_fields), 
DataType::Struct(logical_fields)) => {
+                validate_struct_compatibility(physical_fields, 
logical_fields)?;
+            }
+            _ => {
+                let is_compatible =
+                    can_cast_types(physical_field.data_type(), 
logical_field.data_type());
+                if !is_compatible {
+                    return exec_err!(
+                        "Cannot cast column '{}' from '{}' (physical data 
type) to '{}' (logical data type)",
+                        column.name(),
+                        physical_field.data_type(),
+                        logical_field.data_type()
+                    );
+                }
+            }
         }
 
-        let cast_expr = Arc::new(CastExpr::new(
+        let cast_expr = Arc::new(CastColumnExpr::new(
             Arc::new(column),
-            logical_field.data_type().clone(),
+            Arc::new(physical_field.clone()),
+            Arc::new(logical_field.clone()),
             None,
         ));
 
@@ -444,11 +458,14 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow::array::{RecordBatch, RecordBatchOptions};
-    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+    use arrow::array::{
+        BooleanArray, Int32Array, Int64Array, RecordBatch, RecordBatchOptions,
+        StringArray, StringViewArray, StructArray,
+    };
+    use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
     use datafusion_common::{assert_contains, record_batch, Result, 
ScalarValue};
     use datafusion_expr::Operator;
-    use datafusion_physical_expr::expressions::{col, lit, CastExpr, Column, 
Literal};
+    use datafusion_physical_expr::expressions::{col, lit, Column, Literal};
     use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
     use itertools::Itertools;
     use std::sync::Arc;
@@ -479,7 +496,7 @@ mod tests {
         let result = adapter.rewrite(column_expr).unwrap();
 
         // Should be wrapped in a cast expression
-        assert!(result.as_any().downcast_ref::<CastExpr>().is_some());
+        assert!(result.as_any().downcast_ref::<CastColumnExpr>().is_some());
     }
 
     #[test]
@@ -510,9 +527,10 @@ mod tests {
         println!("Rewritten expression: {result}");
 
         let expected = expressions::BinaryExpr::new(
-            Arc::new(CastExpr::new(
+            Arc::new(CastColumnExpr::new(
                 Arc::new(Column::new("a", 0)),
-                DataType::Int64,
+                Arc::new(Field::new("a", DataType::Int32, false)),
+                Arc::new(Field::new("a", DataType::Int64, false)),
                 None,
             )),
             Operator::Plus,
@@ -554,7 +572,11 @@ mod tests {
         let column_expr = Arc::new(Column::new("data", 0));
 
         let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
-        assert_contains!(error_msg, "Cannot cast column 'data'");
+        // validate_struct_compatibility provides more specific error about 
which field can't be cast
+        assert_contains!(
+            error_msg,
+            "Cannot cast struct field 'field1' from type Binary to type Int32"
+        );
     }
 
     #[test]
@@ -589,15 +611,30 @@ mod tests {
 
         let result = adapter.rewrite(column_expr).unwrap();
 
-        let expected = Arc::new(CastExpr::new(
+        let expected = Arc::new(CastColumnExpr::new(
             Arc::new(Column::new("data", 0)),
-            DataType::Struct(
-                vec![
-                    Field::new("id", DataType::Int64, false),
-                    Field::new("name", DataType::Utf8View, true),
-                ]
-                .into(),
-            ),
+            Arc::new(Field::new(
+                "data",
+                DataType::Struct(
+                    vec![
+                        Field::new("id", DataType::Int32, false),
+                        Field::new("name", DataType::Utf8, true),
+                    ]
+                    .into(),
+                ),
+                false,
+            )),
+            Arc::new(Field::new(
+                "data",
+                DataType::Struct(
+                    vec![
+                        Field::new("id", DataType::Int64, false),
+                        Field::new("name", DataType::Utf8View, true),
+                    ]
+                    .into(),
+                ),
+                false,
+            )),
             None,
         )) as Arc<dyn PhysicalExpr>;
 
@@ -821,6 +858,118 @@ mod tests {
         );
     }
 
+    /// Test that struct columns are properly adapted including:
+    /// - Type casting of subfields (Int32 -> Int64, Utf8 -> Utf8View)
+    /// - Missing fields in logical schema are filled with nulls
+    #[test]
+    fn test_adapt_struct_batches() {
+        // Physical struct: {id: Int32, name: Utf8}
+        let physical_struct_fields: Fields = vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]
+        .into();
+
+        let struct_array = StructArray::new(
+            physical_struct_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
+                Arc::new(StringArray::from(vec![
+                    Some("alice"),
+                    None,
+                    Some("charlie"),
+                ])) as _,
+            ],
+            None,
+        );
+
+        let physical_schema = Arc::new(Schema::new(vec![Field::new(
+            "data",
+            DataType::Struct(physical_struct_fields),
+            false,
+        )]));
+
+        let physical_batch = RecordBatch::try_new(
+            Arc::clone(&physical_schema),
+            vec![Arc::new(struct_array)],
+        )
+        .unwrap();
+
+        // Logical struct: {id: Int64, name: Utf8View, extra: Boolean}
+        // - id: cast from Int32 to Int64
+        // - name: cast from Utf8 to Utf8View
+        // - extra: missing from physical, should be filled with nulls
+        let logical_struct_fields: Fields = vec![
+            Field::new("id", DataType::Int64, false),
+            Field::new("name", DataType::Utf8View, true),
+            Field::new("extra", DataType::Boolean, true), // New field, not in 
physical
+        ]
+        .into();
+
+        let logical_schema = Arc::new(Schema::new(vec![Field::new(
+            "data",
+            DataType::Struct(logical_struct_fields),
+            false,
+        )]));
+
+        let projection = vec![col("data", &logical_schema).unwrap()];
+
+        let factory = DefaultPhysicalExprAdapterFactory;
+        let adapter =
+            factory.create(Arc::clone(&logical_schema), 
Arc::clone(&physical_schema));
+
+        let adapted_projection = projection
+            .into_iter()
+            .map(|expr| adapter.rewrite(expr).unwrap())
+            .collect_vec();
+
+        let adapted_schema = Arc::new(Schema::new(
+            adapted_projection
+                .iter()
+                .map(|expr| expr.return_field(&physical_schema).unwrap())
+                .collect_vec(),
+        ));
+
+        let res = batch_project(
+            adapted_projection,
+            &physical_batch,
+            Arc::clone(&adapted_schema),
+        )
+        .unwrap();
+
+        assert_eq!(res.num_columns(), 1);
+
+        let result_struct = res
+            .column(0)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+
+        // Verify id field is cast to Int64
+        let id_col = result_struct.column_by_name("id").unwrap();
+        assert_eq!(id_col.data_type(), &DataType::Int64);
+        let id_values = id_col.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(
+            id_values.iter().collect_vec(),
+            vec![Some(1), Some(2), Some(3)]
+        );
+
+        // Verify name field is cast to Utf8View
+        let name_col = result_struct.column_by_name("name").unwrap();
+        assert_eq!(name_col.data_type(), &DataType::Utf8View);
+        let name_values = 
name_col.as_any().downcast_ref::<StringViewArray>().unwrap();
+        assert_eq!(
+            name_values.iter().collect_vec(),
+            vec![Some("alice"), None, Some("charlie")]
+        );
+
+        // Verify extra field (missing from physical) is filled with nulls
+        let extra_col = result_struct.column_by_name("extra").unwrap();
+        assert_eq!(extra_col.data_type(), &DataType::Boolean);
+        let extra_values = 
extra_col.as_any().downcast_ref::<BooleanArray>().unwrap();
+        assert_eq!(extra_values.iter().collect_vec(), vec![None, None, None]);
+    }
+
     #[test]
     fn test_try_rewrite_struct_field_access() {
         // Test the core logic of try_rewrite_struct_field_access
diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index 0419161b53..a368aafbc6 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -740,6 +740,9 @@ mod tests {
         Ok(())
     }
 
+    // Tests for timestamp timezone casting have been moved to timestamps.slt
+    // See the "Casting between timestamp with and without timezone" section
+
     #[test]
     fn invalid_cast() {
         // Ensure a useful error happens at plan time if invalid casts are used
diff --git a/datafusion/physical-expr/src/projection.rs 
b/datafusion/physical-expr/src/projection.rs
index 3d6740510b..4688ac0e1b 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -240,6 +240,49 @@ impl ProjectionExprs {
         self.exprs.iter().map(|e| Arc::clone(&e.expr))
     }
 
+    /// Apply a fallible transformation to the [`PhysicalExpr`] of each 
projection.
+    ///
+    /// This method transforms the expression in each [`ProjectionExpr`] while 
preserving
+    /// the alias. This is useful for rewriting expressions, such as when 
adapting
+    /// expressions to a different schema.
+    ///
+    /// # Example
+    ///
+    /// ```rust
+    /// use std::sync::Arc;
+    /// use arrow::datatypes::{DataType, Field, Schema};
+    /// use datafusion_common::Result;
+    /// use datafusion_physical_expr::expressions::Column;
+    /// use datafusion_physical_expr::projection::ProjectionExprs;
+    /// use datafusion_physical_expr::PhysicalExpr;
+    ///
+    /// // Create a schema and projection
+    /// let schema = Arc::new(Schema::new(vec![
+    ///     Field::new("a", DataType::Int32, false),
+    ///     Field::new("b", DataType::Int32, false),
+    /// ]));
+    /// let projection = ProjectionExprs::from_indices(&[0, 1], &schema);
+    ///
+    /// // Transform each expression (this example just clones them)
+    /// let transformed = projection.try_map_exprs(|expr| Ok(expr))?;
+    /// assert_eq!(transformed.as_ref().len(), 2);
+    /// # Ok::<(), datafusion_common::DataFusionError>(())
+    /// ```
+    pub fn try_map_exprs<F>(self, mut f: F) -> Result<Self>
+    where
+        F: FnMut(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
+    {
+        let exprs = self
+            .exprs
+            .into_iter()
+            .map(|mut proj| {
+                proj.expr = f(proj.expr)?;
+                Ok(proj)
+            })
+            .collect::<Result<Vec<_>>>()?;
+        Ok(Self::new(exprs))
+    }
+
     /// Apply another projection on top of this projection, returning the 
combined projection.
     /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as 
z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
     /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0 
as c2`.
diff --git a/datafusion/pruning/src/pruning_predicate.rs 
b/datafusion/pruning/src/pruning_predicate.rs
index b9bbaea45a..4110391514 100644
--- a/datafusion/pruning/src/pruning_predicate.rs
+++ b/datafusion/pruning/src/pruning_predicate.rs
@@ -43,6 +43,7 @@ use datafusion_common::{
     ScalarValue,
 };
 use datafusion_expr_common::operator::Operator;
+use datafusion_physical_expr::expressions::CastColumnExpr;
 use datafusion_physical_expr::utils::{collect_columns, Guarantee, 
LiteralGuarantee};
 use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
 use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
@@ -1105,6 +1106,20 @@ fn rewrite_expr_to_prunable(
             None,
         ));
         Ok((left, op, right))
+    } else if let Some(cast_col) = 
column_expr_any.downcast_ref::<CastColumnExpr>() {
+        // `cast_column(col) op lit()` - same as CastExpr but uses 
CastColumnExpr
+        let arrow_schema = schema.as_arrow();
+        let from_type = cast_col.expr().data_type(arrow_schema)?;
+        let to_type = cast_col.target_field().data_type();
+        verify_support_type_for_prune(&from_type, to_type)?;
+        let (left, op, right) =
+            rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, 
schema)?;
+        // Predicate pruning / statistics generally don't support struct 
columns yet.
+        // In the future we may want to support pruning on nested fields, in 
which case we probably need to
+        // do something more sophisticated here.
+        // But for now since we don't support pruning on nested fields, we can 
just cast to the target type directly.
+        let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), 
None));
+        Ok((left, op, right))
     } else if let Some(try_cast) =
         column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
     {
diff --git a/datafusion/sqllogictest/test_files/timestamps.slt 
b/datafusion/sqllogictest/test_files/timestamps.slt
index 3a1a26257b..3bf11c92e2 100644
--- a/datafusion/sqllogictest/test_files/timestamps.slt
+++ b/datafusion/sqllogictest/test_files/timestamps.slt
@@ -3702,6 +3702,76 @@ FROM ts_data_micros_kolkata
 2020-09-08T18:12:29.190+05:30
 2020-09-08T17:12:29.190+05:30
 
+
+##########
+## Casting between timestamp with and without timezone
+##########
+
+# Test casting from Timestamp(Nanosecond, Some("UTC")) to 
Timestamp(Nanosecond, None)
+# Verifies that the underlying nanosecond values are preserved when removing 
timezone
+
+# Verify input type
+query T
+SELECT arrow_typeof(arrow_cast(1, 'Timestamp(Nanosecond, Some("UTC"))'));
+----
+Timestamp(ns, "UTC")
+
+# Verify output type after casting
+query T
+SELECT arrow_typeof(arrow_cast(arrow_cast(1, 'Timestamp(Nanosecond, 
Some("UTC"))'), 'Timestamp(Nanosecond, None)'));
+----
+Timestamp(ns)
+
+# Verify values are preserved when casting from timestamp with timezone to 
timestamp without timezone
+query P rowsort
+SELECT arrow_cast(column1, 'Timestamp(Nanosecond, None)')
+FROM (VALUES
+  (arrow_cast(1, 'Timestamp(Nanosecond, Some("UTC"))')),
+  (arrow_cast(2, 'Timestamp(Nanosecond, Some("UTC"))')),
+  (arrow_cast(3, 'Timestamp(Nanosecond, Some("UTC"))')),
+  (arrow_cast(4, 'Timestamp(Nanosecond, Some("UTC"))')),
+  (arrow_cast(5, 'Timestamp(Nanosecond, Some("UTC"))'))
+) t;
+----
+1970-01-01T00:00:00.000000001
+1970-01-01T00:00:00.000000002
+1970-01-01T00:00:00.000000003
+1970-01-01T00:00:00.000000004
+1970-01-01T00:00:00.000000005
+
+# Test casting from Timestamp(Nanosecond, None) to Timestamp(Nanosecond, 
Some("UTC"))
+# Verifies that the underlying nanosecond values are preserved when adding 
timezone
+
+# Verify input type
+query T
+SELECT arrow_typeof(arrow_cast(1, 'Timestamp(Nanosecond, None)'));
+----
+Timestamp(ns)
+
+# Verify output type after casting
+query T
+SELECT arrow_typeof(arrow_cast(arrow_cast(1, 'Timestamp(Nanosecond, None)'), 
'Timestamp(Nanosecond, Some("UTC"))'));
+----
+Timestamp(ns, "UTC")
+
+# Verify values are preserved when casting from timestamp without timezone to 
timestamp with timezone
+query P rowsort
+SELECT arrow_cast(column1, 'Timestamp(Nanosecond, Some("UTC"))')
+FROM (VALUES
+  (arrow_cast(1, 'Timestamp(Nanosecond, None)')),
+  (arrow_cast(2, 'Timestamp(Nanosecond, None)')),
+  (arrow_cast(3, 'Timestamp(Nanosecond, None)')),
+  (arrow_cast(4, 'Timestamp(Nanosecond, None)')),
+  (arrow_cast(5, 'Timestamp(Nanosecond, None)'))
+) t;
+----
+1970-01-01T00:00:00.000000001Z
+1970-01-01T00:00:00.000000002Z
+1970-01-01T00:00:00.000000003Z
+1970-01-01T00:00:00.000000004Z
+1970-01-01T00:00:00.000000005Z
+
+
 ##########
 ## Common timestamp data
 ##########
diff --git a/docs/source/library-user-guide/upgrading.md 
b/docs/source/library-user-guide/upgrading.md
index 00e55ac9e7..caab9ad4e7 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -369,7 +369,16 @@ let config = FileScanConfigBuilder::new(url, source)
     .build();
 ```
 
-**Handling projections in `FileSource`:**
+### `SchemaAdapterFactory` Fully Removed from Parquet
+
+Following the deprecation announced in [DataFusion 
49.0.0](#deprecating-schemaadapterfactory-and-schemaadapter), 
`SchemaAdapterFactory` has been fully removed from Parquet scanning. This 
applies to both:
+
+- **Predicate pushdown / row filtering** (deprecated in 49.0.0)
+- **Projections** (newly removed in 52.0.0)
+
+If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g., 
default column values, type coercion), you should now implement 
`PhysicalExprAdapterFactory` instead.
+
+See the [default column values 
example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs)
 for how to implement a custom `PhysicalExprAdapterFactory`.
 
 ### `PhysicalOptimizerRule::optimize` deprecated in favor of `optimize_plan`
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to