This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 900279c5d1 Implement schema adapter support for FileSource and add 
integration tests (#16148)
900279c5d1 is described below

commit 900279c5d189a62a5585210c64ab6868af4640e5
Author: kosiew <kos...@gmail.com>
AuthorDate: Fri May 30 19:45:58 2025 +0800

    Implement schema adapter support for FileSource and add integration tests 
(#16148)
    
    * Implement schema adapter factory support for file sources
    
    * Add schema adapter factory support to file sources
    
    * Add SchemaAdapterFactory import to file source module
    
    * Add schema_adapter_factory field to JsonOpener and JsonSource structs
    
    * Add missing import for as_file_source in source.rs
    
    * Fix formatting in ArrowSource implementation by removing extra newlines
    
    * Add integration and unit tests for schema adapter factory functionality
    
    * fix tests
    
    * Refactor adapt method signature and improve test assertions for schema 
adapter factory
    
    * Simplify constructor in TestSource by removing redundant function 
definition
    
    * Remove redundant import of SchemaAdapterFactory in util.rs
    
    * fix tests: refactor schema_adapter_factory methods in TestSource for 
improved clarity
    
    * feat: add macro for schema adapter methods in FileSource implementation
    
    * feat: use macro implement schema adapter methods for various FileSource 
implementations
    
    * refactor: clean up unused schema adapter factory methods in ParquetSource
    
    * feat: add macro for generating schema adapter methods in FileSource 
implementations
    
    * refactor: re-export impl_schema_adapter_methods from crate root
    
    * refactor: update macro usage and documentation for schema adapter methods
    
    * refactor: clean up import statements in datasource module
    
    * refactor: reorganize and clean up import statements in util.rs
    
    * Resolve merge conflict
    
    * Export macro with local inner macros for improved encapsulation
    
    * fix clippy error
    
    * fix doc tests
    
    * fix CI error
    
    * Add metrics initialization to TestSource constructor
    
    * Add comment for test_multi_source_schema_adapter_reuse
    
    * reduce test files, move non-redundant tests, consolidate in one file
    
    * test_schema_adapter - add comments
    
    * remove redundant tests
    
    * Refactor schema adapter application to use ParquetSource method directly
    
    * Refactor apply_schema_adapter usage to call method directly on 
ParquetSource
    
    * remove macro
    
    * Revert "remove macro"
    
    This reverts commit 208b1cc996bfc77171ce5eaef50ca3fd296c9d40.
    
    * FileSource - provide default implementations for schema_adapter_factory 
methods
    
    * Revert "FileSource - provide default implementations for 
schema_adapter_factory methods"
    
    This reverts commit ee07b69fbcc0a41bd2a859f5f0fa328b35d4e69d.
    
    * Remove unused import of SchemaAdapterFactory from file_format.rs
    
    * Refactor imports in apply_schema_adapter_tests.rs for improved readability
---
 datafusion/core/src/datasource/mod.rs              |  39 ++--
 .../src/datasource/physical_plan/arrow_file.rs     |  11 +
 .../schema_adapter_integration_tests.rs            | 260 +++++++++++++++++++++
 .../physical_optimizer/filter_pushdown/util.rs     |  50 ++--
 datafusion/core/tests/test_adapter_updated.rs      | 214 +++++++++++++++++
 datafusion/datasource-avro/src/source.rs           |   5 +
 datafusion/datasource-csv/src/source.rs            |  13 +-
 datafusion/datasource-json/src/source.rs           |  14 +-
 datafusion/datasource-parquet/src/file_format.rs   |   4 +-
 datafusion/datasource-parquet/src/source.rs        |  49 ++--
 .../tests/apply_schema_adapter_tests.rs            | 206 ++++++++++++++++
 datafusion/datasource/src/file.rs                  |  25 ++
 datafusion/datasource/src/file_scan_config.rs      |  12 +-
 datafusion/datasource/src/macros.rs                | 145 ++++++++++++
 datafusion/datasource/src/mod.rs                   |   2 +
 datafusion/datasource/src/test_util.rs             |   4 +
 16 files changed, 975 insertions(+), 78 deletions(-)

diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index 674541ff73..f0c6771515 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -51,28 +51,26 @@ pub use datafusion_physical_expr::create_ordering;
 #[cfg(all(test, feature = "parquet"))]
 mod tests {
 
-    use crate::prelude::SessionContext;
-
-    use std::fs;
-    use std::sync::Arc;
-
-    use arrow::array::{Int32Array, StringArray};
-    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use arrow::record_batch::RecordBatch;
-    use datafusion_common::test_util::batches_to_sort_string;
-    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
     use datafusion_datasource::schema_adapter::{
         DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, 
SchemaMapper,
     };
-    use datafusion_datasource::PartitionedFile;
-    use datafusion_datasource_parquet::source::ParquetSource;
-
-    use datafusion_common::record_batch;
 
-    use ::object_store::path::Path;
-    use ::object_store::ObjectMeta;
-    use datafusion_datasource::source::DataSourceExec;
+    use crate::prelude::SessionContext;
+    use arrow::{
+        array::{Int32Array, StringArray},
+        datatypes::{DataType, Field, Schema, SchemaRef},
+        record_batch::RecordBatch,
+    };
+    use datafusion_common::{record_batch, test_util::batches_to_sort_string};
+    use datafusion_datasource::{
+        file::FileSource, file_scan_config::FileScanConfigBuilder,
+        source::DataSourceExec, PartitionedFile,
+    };
+    use datafusion_datasource_parquet::source::ParquetSource;
+    use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_physical_plan::collect;
+    use object_store::{path::Path, ObjectMeta};
+    use std::{fs, sync::Arc};
     use tempfile::TempDir;
 
     #[tokio::test]
@@ -81,7 +79,6 @@ mod tests {
         // record batches returned from parquet.  This can be useful for 
schema evolution
         // where older files may not have all columns.
 
-        use datafusion_execution::object_store::ObjectStoreUrl;
         let tmp_dir = TempDir::new().unwrap();
         let table_dir = tmp_dir.path().join("parquet_test");
         fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
@@ -124,10 +121,8 @@ mod tests {
         let f2 = Field::new("extra_column", DataType::Utf8, true);
 
         let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
-        let source = Arc::new(
-            ParquetSource::default()
-                .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory 
{})),
-        );
+        let source = ParquetSource::default()
+            .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory 
{}));
         let base_conf = FileScanConfigBuilder::new(
             ObjectStoreUrl::local_filesystem(),
             schema,
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs 
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 897d1c0447..6de72aa8ff 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -20,6 +20,8 @@ use std::sync::Arc;
 
 use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
 use crate::error::Result;
+use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+use datafusion_datasource::{as_file_source, impl_schema_adapter_methods};
 
 use arrow::buffer::Buffer;
 use arrow::datatypes::SchemaRef;
@@ -39,6 +41,13 @@ use object_store::{GetOptions, GetRange, GetResultPayload, 
ObjectStore};
 pub struct ArrowSource {
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl From<ArrowSource> for Arc<dyn FileSource> {
+    fn from(source: ArrowSource) -> Self {
+        as_file_source(source)
+    }
 }
 
 impl FileSource for ArrowSource {
@@ -89,6 +98,8 @@ impl FileSource for ArrowSource {
     fn file_type(&self) -> &str {
         "arrow"
     }
+
+    impl_schema_adapter_methods!();
 }
 
 /// The struct arrow that implements `[FileOpener]` trait
diff --git 
a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs 
b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs
new file mode 100644
index 0000000000..38c2ee582a
--- /dev/null
+++ 
b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration test for schema adapter factory functionality
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::arrow_file::ArrowSource;
+use datafusion::prelude::*;
+use datafusion_common::Result;
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::schema_adapter::{SchemaAdapter, 
SchemaAdapterFactory};
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::PartitionedFile;
+use std::sync::Arc;
+use tempfile::TempDir;
+
+#[cfg(feature = "parquet")]
+use datafusion_datasource_parquet::ParquetSource;
+#[cfg(feature = "parquet")]
+use parquet::arrow::ArrowWriter;
+#[cfg(feature = "parquet")]
+use parquet::file::properties::WriterProperties;
+
+#[cfg(feature = "csv")]
+use datafusion_datasource_csv::CsvSource;
+
+/// A schema adapter factory that transforms column names to uppercase
+#[derive(Debug)]
+struct UppercaseAdapterFactory {}
+
+impl SchemaAdapterFactory for UppercaseAdapterFactory {
+    fn create(&self, schema: &Schema) -> Result<Box<dyn SchemaAdapter>> {
+        Ok(Box::new(UppercaseAdapter {
+            input_schema: Arc::new(schema.clone()),
+        }))
+    }
+}
+
+/// Schema adapter that transforms column names to uppercase
+#[derive(Debug)]
+struct UppercaseAdapter {
+    input_schema: SchemaRef,
+}
+
+impl SchemaAdapter for UppercaseAdapter {
+    fn adapt(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
+        // In a real adapter, we might transform the data too
+        // For this test, we're just passing through the batch
+        Ok(record_batch)
+    }
+
+    fn output_schema(&self) -> SchemaRef {
+        let fields = self
+            .input_schema
+            .fields()
+            .iter()
+            .map(|f| {
+                Field::new(
+                    f.name().to_uppercase().as_str(),
+                    f.data_type().clone(),
+                    f.is_nullable(),
+                )
+            })
+            .collect();
+
+        Arc::new(Schema::new(fields))
+    }
+}
+
+#[cfg(feature = "parquet")]
+#[tokio::test]
+async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
+    // Create a temporary directory for our test file
+    let tmp_dir = TempDir::new()?;
+    let file_path = tmp_dir.path().join("test.parquet");
+    let file_path_str = file_path.to_str().unwrap();
+
+    // Create test data
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+    ]));
+
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
+            Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
+        ],
+    )?;
+
+    // Write test parquet file
+    let file = std::fs::File::create(file_path_str)?;
+    let props = WriterProperties::builder().build();
+    let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
+    writer.write(&batch)?;
+    writer.close()?;
+
+    // Create a session context
+    let ctx = SessionContext::new();
+
+    // Create a ParquetSource with the adapter factory
+    let source = ParquetSource::default()
+        .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}));
+
+    // Create a scan config
+    let config = FileScanConfigBuilder::new(
+        ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?,
+        schema.clone(),
+    )
+    .with_source(source)
+    .build();
+
+    // Create a data source executor
+    let exec = DataSourceExec::from_data_source(config);
+
+    // Collect results
+    let task_ctx = ctx.task_ctx();
+    let stream = exec.execute(0, task_ctx)?;
+    let batches = datafusion::physical_plan::common::collect(stream).await?;
+
+    // There should be one batch
+    assert_eq!(batches.len(), 1);
+
+    // Verify the schema has uppercase column names
+    let result_schema = batches[0].schema();
+    assert_eq!(result_schema.field(0).name(), "ID");
+    assert_eq!(result_schema.field(1).name(), "NAME");
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
+    // This test verifies that the same schema adapter factory can be reused
+    // across different file source types. This is important for ensuring that:
+    // 1. The schema adapter factory interface works uniformly across all 
source types
+    // 2. The factory can be shared and cloned efficiently using Arc
+    // 3. Various data source implementations correctly implement the schema 
adapter factory pattern
+
+    // Create a test factory
+    let factory = Arc::new(UppercaseAdapterFactory {});
+
+    // Apply the same adapter to different source types
+    let arrow_source =
+        ArrowSource::default().with_schema_adapter_factory(factory.clone());
+
+    #[cfg(feature = "parquet")]
+    let parquet_source =
+        ParquetSource::default().with_schema_adapter_factory(factory.clone());
+
+    #[cfg(feature = "csv")]
+    let csv_source = 
CsvSource::default().with_schema_adapter_factory(factory.clone());
+
+    // Verify adapters were properly set
+    assert!(arrow_source.schema_adapter_factory().is_some());
+
+    #[cfg(feature = "parquet")]
+    assert!(parquet_source.schema_adapter_factory().is_some());
+
+    #[cfg(feature = "csv")]
+    assert!(csv_source.schema_adapter_factory().is_some());
+
+    Ok(())
+}
+
+// Helper function to test From<T> for Arc<dyn FileSource> implementations
+fn test_from_impl<T: Into<Arc<dyn FileSource>> + Default>(expected_file_type: 
&str) {
+    let source = T::default();
+    let file_source: Arc<dyn FileSource> = source.into();
+    assert_eq!(file_source.file_type(), expected_file_type);
+}
+
+#[test]
+fn test_from_implementations() {
+    // Test From implementation for various sources
+    test_from_impl::<ArrowSource>("arrow");
+
+    #[cfg(feature = "parquet")]
+    test_from_impl::<ParquetSource>("parquet");
+
+    #[cfg(feature = "csv")]
+    test_from_impl::<CsvSource>("csv");
+
+    #[cfg(feature = "json")]
+    test_from_impl::<datafusion_datasource_json::JsonSource>("json");
+}
+
+/// A simple test schema adapter factory that doesn't modify the schema
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+    fn create(&self, schema: &Schema) -> Result<Box<dyn SchemaAdapter>> {
+        Ok(Box::new(TestSchemaAdapter {
+            input_schema: Arc::new(schema.clone()),
+        }))
+    }
+}
+
+/// A test schema adapter that passes through data unmodified
+#[derive(Debug)]
+struct TestSchemaAdapter {
+    input_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+    fn adapt(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
+        // Just pass through the batch unmodified
+        Ok(record_batch)
+    }
+
+    fn output_schema(&self) -> SchemaRef {
+        self.input_schema.clone()
+    }
+}
+
+#[cfg(feature = "parquet")]
+#[test]
+fn test_schema_adapter_preservation() {
+    // Create a test schema
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+    ]));
+
+    // Create source with schema adapter factory
+    let source = ParquetSource::default();
+    let factory = Arc::new(TestSchemaAdapterFactory {});
+    let file_source = source.with_schema_adapter_factory(factory);
+
+    // Create a FileScanConfig with the source
+    let config_builder =
+        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), 
schema.clone())
+            .with_source(file_source.clone())
+            // Add a file to make it valid
+            .with_file(PartitionedFile::new("test.parquet", 100));
+
+    let config = config_builder.build();
+
+    // Verify the schema adapter factory is present in the file source
+    assert!(config.source().schema_adapter_factory().is_some());
+}
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index 393322a7f3..dc4d77194c 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -15,50 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    any::Any,
-    fmt::{Display, Formatter},
-};
-use std::{
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
-
 use arrow::datatypes::SchemaRef;
 use arrow::error::ArrowError;
 use arrow::{array::RecordBatch, compute::concat_batches};
 use datafusion::{datasource::object_store::ObjectStoreUrl, 
physical_plan::PhysicalExpr};
-use datafusion_common::{config::ConfigOptions, Statistics};
-use datafusion_common::{internal_err, Result};
-use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
-use datafusion_datasource::file_stream::FileOpenFuture;
-use datafusion_datasource::source::DataSourceExec;
+use datafusion_common::{config::ConfigOptions, internal_err, Result, 
Statistics};
 use datafusion_datasource::{
-    file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
-};
-use datafusion_datasource::{
-    file_meta::FileMeta, schema_adapter::DefaultSchemaAdapterFactory, 
PartitionedFile,
+    file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig,
+    file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
+    file_stream::FileOpener, impl_schema_adapter_methods,
+    schema_adapter::DefaultSchemaAdapterFactory, 
schema_adapter::SchemaAdapterFactory,
+    source::DataSourceExec, PartitionedFile,
 };
 use datafusion_physical_expr::conjunction;
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::{
-    displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
-};
-use datafusion_physical_plan::{
+    displayable,
     filter::FilterExec,
     filter_pushdown::{
         ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
         PredicateSupport, PredicateSupports,
     },
-    DisplayAs, PlanProperties,
+    metrics::ExecutionPlanMetricsSet,
+    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
 };
-
 use futures::stream::BoxStream;
 use futures::{FutureExt, Stream};
 use object_store::ObjectStore;
-
+use std::{
+    any::Any,
+    fmt::{Display, Formatter},
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
 pub struct TestOpener {
     batches: Vec<RecordBatch>,
     batch_size: Option<usize>,
@@ -119,19 +110,16 @@ pub struct TestSource {
     schema: Option<SchemaRef>,
     metrics: ExecutionPlanMetricsSet,
     projection: Option<Vec<usize>>,
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
 impl TestSource {
     fn new(support: bool, batches: Vec<RecordBatch>) -> Self {
         Self {
             support,
-            predicate: None,
-            statistics: None,
-            batch_size: None,
-            schema: None,
-            projection: None,
             metrics: ExecutionPlanMetricsSet::new(),
             batches,
+            ..Default::default()
         }
     }
 }
@@ -243,6 +231,8 @@ impl FileSource for TestSource {
             Ok(FilterPushdownPropagation::unsupported(filters))
         }
     }
+
+    impl_schema_adapter_methods!();
 }
 
 #[derive(Debug, Clone)]
diff --git a/datafusion/core/tests/test_adapter_updated.rs 
b/datafusion/core/tests/test_adapter_updated.rs
new file mode 100644
index 0000000000..c85b9a3447
--- /dev/null
+++ b/datafusion/core/tests/test_adapter_updated.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::file_stream::FileOpener;
+use datafusion_datasource::schema_adapter::{
+    SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+};
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// A test source for testing schema adapters
+#[derive(Debug, Clone)]
+struct TestSource {
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl TestSource {
+    fn new() -> Self {
+        Self {
+            schema_adapter_factory: None,
+        }
+    }
+}
+
+impl FileSource for TestSource {
+    fn file_type(&self) -> &str {
+        "test"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn create_file_opener(
+        &self,
+        _store: Arc<dyn ObjectStore>,
+        _conf: &FileScanConfig,
+        _index: usize,
+    ) -> Arc<dyn FileOpener> {
+        unimplemented!("Not needed for this test")
+    }
+
+    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn with_projection(&self, _projection: &FileScanConfig) -> Arc<dyn 
FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn with_statistics(&self, _statistics: Statistics) -> Arc<dyn FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        unimplemented!("Not needed for this test")
+    }
+
+    fn statistics(&self) -> Result<Statistics, DataFusionError> {
+        Ok(Statistics::default())
+    }
+
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Arc<dyn FileSource> {
+        Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+        })
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
+}
+
+/// A test schema adapter factory
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        _table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(TestSchemaAdapter {
+            table_schema: projected_table_schema,
+        })
+    }
+}
+
+/// A test schema adapter implementation
+#[derive(Debug)]
+struct TestSchemaAdapter {
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field = self.table_schema.field(index);
+        file_schema.fields.find(field.name()).map(|(i, _)| i)
+    }
+
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+        for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
+            if self.table_schema.fields().find(file_field.name()).is_some() {
+                projection.push(file_idx);
+            }
+        }
+
+        Ok((Arc::new(TestSchemaMapping {}), projection))
+    }
+}
+
+/// A test schema mapper implementation
+#[derive(Debug)]
+struct TestSchemaMapping {}
+
+impl SchemaMapper for TestSchemaMapping {
+    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        // For testing, just return the original batch
+        Ok(batch)
+    }
+
+    fn map_column_statistics(
+        &self,
+        stats: &[ColumnStatistics],
+    ) -> Result<Vec<ColumnStatistics>> {
+        // For testing, just return the input statistics
+        Ok(stats.to_vec())
+    }
+}
+
+#[test]
+fn test_schema_adapter() {
+    // This test verifies the functionality of the SchemaAdapter and 
SchemaAdapterFactory
+    // components used in DataFusion's file sources.
+    //
+    // The test specifically checks:
+    // 1. Creating and attaching a schema adapter factory to a file source
+    // 2. Creating a schema adapter using the factory
+    // 3. The schema adapter's ability to map column indices between a table 
schema and a file schema
+    // 4. The schema adapter's ability to create a projection that selects 
only the columns
+    //    from the file schema that are present in the table schema
+    //
+    // Schema adapters are used when the schema of data in files doesn't 
exactly match
+    // the schema expected by the query engine, allowing for field mapping and 
data transformation.
+
+    // Create a test schema
+    let table_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+    ]));
+
+    // Create a file schema
+    let file_schema = Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+        Field::new("extra", DataType::Int64, true),
+    ]);
+
+    // Create a TestSource
+    let source = TestSource::new();
+    assert!(source.schema_adapter_factory().is_none());
+
+    // Add a schema adapter factory
+    let factory = Arc::new(TestSchemaAdapterFactory {});
+    let source_with_adapter = source.with_schema_adapter_factory(factory);
+    assert!(source_with_adapter.schema_adapter_factory().is_some());
+
+    // Create a schema adapter
+    let adapter_factory = 
source_with_adapter.schema_adapter_factory().unwrap();
+    let adapter =
+        adapter_factory.create(Arc::clone(&table_schema), 
Arc::clone(&table_schema));
+
+    // Test mapping column index
+    assert_eq!(adapter.map_column_index(0, &file_schema), Some(0));
+    assert_eq!(adapter.map_column_index(1, &file_schema), Some(1));
+
+    // Test creating schema mapper
+    let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap();
+    assert_eq!(projection, vec![0, 1]);
+}
diff --git a/datafusion/datasource-avro/src/source.rs 
b/datafusion/datasource-avro/src/source.rs
index abd049a50e..2fdf34b3cc 100644
--- a/datafusion/datasource-avro/src/source.rs
+++ b/datafusion/datasource-avro/src/source.rs
@@ -28,6 +28,8 @@ use datafusion_common::Statistics;
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfig;
 use datafusion_datasource::file_stream::FileOpener;
+use datafusion_datasource::impl_schema_adapter_methods;
+use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 
@@ -41,6 +43,7 @@ pub struct AvroSource {
     projection: Option<Vec<String>>,
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
 impl AvroSource {
@@ -123,6 +126,8 @@ impl FileSource for AvroSource {
     ) -> Result<Option<FileScanConfig>> {
         Ok(None)
     }
+
+    impl_schema_adapter_methods!();
 }
 
 mod private {
diff --git a/datafusion/datasource-csv/src/source.rs 
b/datafusion/datasource-csv/src/source.rs
index cbadb5dd91..d45080dc20 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -17,6 +17,7 @@
 
 //! Execution plan for reading CSV files
 
+use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use std::any::Any;
 use std::fmt;
 use std::io::{Read, Seek, SeekFrom};
@@ -28,7 +29,8 @@ use 
datafusion_datasource::file_compression_type::FileCompressionType;
 use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
 use datafusion_datasource::{
-    calculate_range, FileRange, ListingTableUrl, RangeCalculation,
+    as_file_source, calculate_range, impl_schema_adapter_methods, FileRange,
+    ListingTableUrl, RangeCalculation,
 };
 
 use arrow::csv;
@@ -91,6 +93,7 @@ pub struct CsvSource {
     comment: Option<u8>,
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
 impl CsvSource {
@@ -212,6 +215,12 @@ impl CsvOpener {
     }
 }
 
+impl From<CsvSource> for Arc<dyn FileSource> {
+    fn from(source: CsvSource) -> Self {
+        as_file_source(source)
+    }
+}
+
 impl FileSource for CsvSource {
     fn create_file_opener(
         &self,
@@ -274,6 +283,8 @@ impl FileSource for CsvSource {
             DisplayFormatType::TreeRender => Ok(()),
         }
     }
+
+    impl_schema_adapter_methods!();
 }
 
 impl FileOpener for CsvOpener {
diff --git a/datafusion/datasource-json/src/source.rs 
b/datafusion/datasource-json/src/source.rs
index 982b799556..187876522e 100644
--- a/datafusion/datasource-json/src/source.rs
+++ b/datafusion/datasource-json/src/source.rs
@@ -30,7 +30,11 @@ use datafusion_datasource::decoder::{deserialize_stream, 
DecoderDeserializer};
 use datafusion_datasource::file_compression_type::FileCompressionType;
 use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
-use datafusion_datasource::{calculate_range, ListingTableUrl, 
RangeCalculation};
+use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+use datafusion_datasource::{
+    as_file_source, calculate_range, impl_schema_adapter_methods, 
ListingTableUrl,
+    RangeCalculation,
+};
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
 use arrow::json::ReaderBuilder;
@@ -77,6 +81,7 @@ pub struct JsonSource {
     batch_size: Option<usize>,
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
 impl JsonSource {
@@ -86,6 +91,12 @@ impl JsonSource {
     }
 }
 
+impl From<JsonSource> for Arc<dyn FileSource> {
+    fn from(source: JsonSource) -> Self {
+        as_file_source(source)
+    }
+}
+
 impl FileSource for JsonSource {
     fn create_file_opener(
         &self,
@@ -140,6 +151,7 @@ impl FileSource for JsonSource {
     fn file_type(&self) -> &str {
         "json"
     }
+    impl_schema_adapter_methods!();
 }
 
 impl FileOpener for JsonOpener {
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 253bd8872d..851e336443 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -422,9 +422,11 @@ impl FileFormat for ParquetFormat {
         if let Some(metadata_size_hint) = metadata_size_hint {
             source = source.with_metadata_size_hint(metadata_size_hint)
         }
+        // Apply schema adapter factory before building the new config
+        let file_source = source.apply_schema_adapter(&conf);
 
         let conf = FileScanConfigBuilder::from(conf)
-            .with_source(Arc::new(source))
+            .with_source(file_source)
             .build();
         Ok(DataSourceExec::from_data_source(conf))
     }
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 69347f440c..30b774d08f 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -27,7 +27,9 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas;
 use crate::DefaultParquetFileReaderFactory;
 use crate::ParquetFileReaderFactory;
 use datafusion_common::config::ConfigOptions;
+use datafusion_datasource::as_file_source;
 use datafusion_datasource::file_stream::FileOpener;
+use datafusion_datasource::impl_schema_adapter_methods;
 use datafusion_datasource::schema_adapter::{
     DefaultSchemaAdapterFactory, SchemaAdapterFactory,
 };
@@ -49,7 +51,6 @@ use datafusion_physical_plan::DisplayFormatType;
 
 use itertools::Itertools;
 use object_store::ObjectStore;
-
 /// Execution plan for reading one or more Parquet files.
 ///
 /// ```text
@@ -343,25 +344,6 @@ impl ParquetSource {
         self
     }
 
-    /// return the optional schema adapter factory
-    pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn 
SchemaAdapterFactory>> {
-        self.schema_adapter_factory.as_ref()
-    }
-
-    /// Set optional schema adapter factory.
-    ///
-    /// [`SchemaAdapterFactory`] allows user to specify how fields from the
-    /// parquet file get mapped to that of the table schema.  The default 
schema
-    /// adapter uses arrow's cast library to map the parquet fields to the 
table
-    /// schema.
-    pub fn with_schema_adapter_factory(
-        mut self,
-        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
-    ) -> Self {
-        self.schema_adapter_factory = Some(schema_adapter_factory);
-        self
-    }
-
     /// If true, the predicate will be used during the parquet scan.
     /// Defaults to false
     ///
@@ -426,6 +408,25 @@ impl ParquetSource {
     fn bloom_filter_on_read(&self) -> bool {
         self.table_parquet_options.global.bloom_filter_on_read
     }
+
+    /// Applies schema adapter factory from the FileScanConfig if present.
+    ///
+    /// # Arguments
+    /// * `conf` - FileScanConfig that may contain a schema adapter factory
+    /// # Returns
+    /// The converted FileSource with schema adapter factory applied if 
provided
+    pub fn apply_schema_adapter(self, conf: &FileScanConfig) -> Arc<dyn 
FileSource> {
+        let file_source: Arc<dyn FileSource> = self.into();
+
+        // If the FileScanConfig.file_source() has a schema adapter factory, 
apply it
+        if let Some(factory) = conf.file_source().schema_adapter_factory() {
+            file_source.with_schema_adapter_factory(
+                Arc::<dyn SchemaAdapterFactory>::clone(&factory),
+            )
+        } else {
+            file_source
+        }
+    }
 }
 
 /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a 
arrow_schema.datatype.TimeUnit
@@ -446,6 +447,13 @@ pub(crate) fn parse_coerce_int96_string(
     }
 }
 
+/// Allows easy conversion from ParquetSource to Arc&lt;dyn FileSource&gt;
+impl From<ParquetSource> for Arc<dyn FileSource> {
+    fn from(source: ParquetSource) -> Self {
+        as_file_source(source)
+    }
+}
+
 impl FileSource for ParquetSource {
     fn create_file_opener(
         &self,
@@ -656,4 +664,5 @@ impl FileSource for ParquetSource {
         );
         
Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source))
     }
+    impl_schema_adapter_methods!();
 }
diff --git a/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs 
b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs
new file mode 100644
index 0000000000..89406fb742
--- /dev/null
+++ b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod parquet_adapter_tests {
+    use arrow::{
+        datatypes::{DataType, Field, Schema, SchemaRef},
+        record_batch::RecordBatch,
+    };
+    use datafusion_common::{ColumnStatistics, DataFusionError, Result};
+    use datafusion_datasource::{
+        file::FileSource,
+        file_scan_config::FileScanConfigBuilder,
+        schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper},
+    };
+    use datafusion_datasource_parquet::source::ParquetSource;
+    use datafusion_execution::object_store::ObjectStoreUrl;
+    use std::{fmt::Debug, sync::Arc};
+
+    /// A test schema adapter factory that adds prefix to column names
+    #[derive(Debug)]
+    struct PrefixAdapterFactory {
+        prefix: String,
+    }
+
+    impl SchemaAdapterFactory for PrefixAdapterFactory {
+        fn create(
+            &self,
+            projected_table_schema: SchemaRef,
+            _table_schema: SchemaRef,
+        ) -> Box<dyn SchemaAdapter> {
+            Box::new(PrefixAdapter {
+                input_schema: projected_table_schema,
+                prefix: self.prefix.clone(),
+            })
+        }
+    }
+
+    /// A test schema adapter that adds prefix to column names
+    #[derive(Debug)]
+    struct PrefixAdapter {
+        input_schema: SchemaRef,
+        prefix: String,
+    }
+
+    impl SchemaAdapter for PrefixAdapter {
+        fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+            let field = self.input_schema.field(index);
+            file_schema.fields.find(field.name()).map(|(i, _)| i)
+        }
+
+        fn map_schema(
+            &self,
+            file_schema: &Schema,
+        ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+            let mut projection = 
Vec::with_capacity(file_schema.fields().len());
+            for (file_idx, file_field) in 
file_schema.fields().iter().enumerate() {
+                if 
self.input_schema.fields().find(file_field.name()).is_some() {
+                    projection.push(file_idx);
+                }
+            }
+
+            // Create a schema mapper that adds a prefix to column names
+            #[derive(Debug)]
+            struct PrefixSchemaMapping {
+                // Keep only the prefix field which is actually used in the 
implementation
+                prefix: String,
+            }
+
+            impl SchemaMapper for PrefixSchemaMapping {
+                fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> 
{
+                    // Create a new schema with prefixed field names
+                    let prefixed_fields: Vec<Field> = batch
+                        .schema()
+                        .fields()
+                        .iter()
+                        .map(|field| {
+                            Field::new(
+                                format!("{}{}", self.prefix, field.name()),
+                                field.data_type().clone(),
+                                field.is_nullable(),
+                            )
+                        })
+                        .collect();
+                    let prefixed_schema = 
Arc::new(Schema::new(prefixed_fields));
+
+                    // Create a new batch with the prefixed schema but the 
same data
+                    let options = 
arrow::record_batch::RecordBatchOptions::default();
+                    RecordBatch::try_new_with_options(
+                        prefixed_schema,
+                        batch.columns().to_vec(),
+                        &options,
+                    )
+                    .map_err(|e| DataFusionError::ArrowError(e, None))
+                }
+
+                fn map_column_statistics(
+                    &self,
+                    stats: &[ColumnStatistics],
+                ) -> Result<Vec<ColumnStatistics>> {
+                    // For testing, just return the input statistics
+                    Ok(stats.to_vec())
+                }
+            }
+
+            Ok((
+                Arc::new(PrefixSchemaMapping {
+                    prefix: self.prefix.clone(),
+                }),
+                projection,
+            ))
+        }
+    }
+
+    #[test]
+    fn test_apply_schema_adapter_with_factory() {
+        // Create a schema
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+
+        // Create a parquet source
+        let source = ParquetSource::default();
+
+        // Create a file scan config with source that has a schema adapter 
factory
+        let factory = Arc::new(PrefixAdapterFactory {
+            prefix: "test_".to_string(),
+        });
+
+        let file_source = source.clone().with_schema_adapter_factory(factory);
+
+        let config = FileScanConfigBuilder::new(
+            ObjectStoreUrl::local_filesystem(),
+            schema.clone(),
+            file_source,
+        )
+        .build();
+
+        // Apply schema adapter to a new source
+        let result_source = source.apply_schema_adapter(&config);
+
+        // Verify the adapter was applied
+        assert!(result_source.schema_adapter_factory().is_some());
+
+        // Create adapter and test it produces expected schema
+        let adapter_factory = result_source.schema_adapter_factory().unwrap();
+        let adapter = adapter_factory.create(schema.clone(), schema.clone());
+
+        // Create a dummy batch to test the schema mapping
+        let dummy_batch = RecordBatch::new_empty(schema.clone());
+
+        // Get the file schema (which is the same as the table schema in this 
test)
+        let (mapper, _) = adapter.map_schema(&schema).unwrap();
+
+        // Apply the mapping to get the output schema
+        let mapped_batch = mapper.map_batch(dummy_batch).unwrap();
+        let output_schema = mapped_batch.schema();
+
+        // Check the column names have the prefix
+        assert_eq!(output_schema.field(0).name(), "test_id");
+        assert_eq!(output_schema.field(1).name(), "test_name");
+    }
+
+    #[test]
+    fn test_apply_schema_adapter_without_factory() {
+        // Create a schema
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+
+        // Create a parquet source
+        let source = ParquetSource::default();
+
+        // Convert to Arc<dyn FileSource>
+        let file_source: Arc<dyn FileSource> = Arc::new(source.clone());
+
+        // Create a file scan config without a schema adapter factory
+        let config = FileScanConfigBuilder::new(
+            ObjectStoreUrl::local_filesystem(),
+            schema.clone(),
+            file_source,
+        )
+        .build();
+
+        // Apply schema adapter function - should pass through the source 
unchanged
+        let result_source = source.apply_schema_adapter(&config);
+
+        // Verify no adapter was applied
+        assert!(result_source.schema_adapter_factory().is_none());
+    }
+}
diff --git a/datafusion/datasource/src/file.rs 
b/datafusion/datasource/src/file.rs
index c9b5c416f0..d0557e9f08 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -25,6 +25,7 @@ use std::sync::Arc;
 use crate::file_groups::FileGroupPartitioner;
 use crate::file_scan_config::FileScanConfig;
 use crate::file_stream::FileOpener;
+use crate::schema_adapter::SchemaAdapterFactory;
 use arrow::datatypes::SchemaRef;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{Result, Statistics};
@@ -35,6 +36,11 @@ use datafusion_physical_plan::DisplayFormatType;
 
 use object_store::ObjectStore;
 
+/// Helper function to convert any type implementing FileSource to Arc&lt;dyn 
FileSource&gt;
+pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn 
FileSource> {
+    Arc::new(source)
+}
+
 /// file format specific behaviors for elements in [`DataSource`]
 ///
 /// See more details on specific implementations:
@@ -116,4 +122,23 @@ pub trait FileSource: Send + Sync {
     ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
         Ok(FilterPushdownPropagation::unsupported(filters))
     }
+
+    /// Set optional schema adapter factory.
+    ///
+    /// [`SchemaAdapterFactory`] allows user to specify how fields from the
+    /// file get mapped to that of the table schema. The default implementation
+    /// returns the original source.
+    ///
+    /// Note: You can implement this method and `schema_adapter_factory`
+    /// automatically using the [`crate::impl_schema_adapter_methods`] macro.
+    fn with_schema_adapter_factory(
+        &self,
+        factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Arc<dyn FileSource>;
+
+    /// Returns the current schema adapter factory if set
+    ///
+    /// Note: You can implement this method and `with_schema_adapter_factory`
+    /// automatically using the [`crate::impl_schema_adapter_methods`] macro.
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>;
 }
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index f93792b7fa..39431f3e9f 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -24,6 +24,8 @@ use std::{
 };
 
 use crate::file_groups::FileGroup;
+#[allow(unused_imports)]
+use crate::schema_adapter::SchemaAdapterFactory;
 use crate::{
     display::FileGroupsDisplay,
     file::FileSource,
@@ -83,6 +85,7 @@ use log::{debug, warn};
 /// # use datafusion_execution::object_store::ObjectStoreUrl;
 /// # use datafusion_physical_plan::ExecutionPlan;
 /// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+/// # use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 /// # let file_schema = Arc::new(Schema::new(vec![
 /// #  Field::new("c1", DataType::Int32, false),
 /// #  Field::new("c2", DataType::Int32, false),
@@ -92,7 +95,8 @@ use log::{debug, warn};
 /// # // Note: crate mock ParquetSource, as ParquetSource is not in the 
datasource crate
 /// #[derive(Clone)]
 /// # struct ParquetSource {
-/// #    projected_statistics: Option<Statistics>
+/// #    projected_statistics: Option<Statistics>,
+/// #    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>
 /// # };
 /// # impl FileSource for ParquetSource {
 /// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: 
&FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
@@ -100,13 +104,15 @@ use log::{debug, warn};
 /// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { 
unimplemented!() }
 /// #  fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { 
Arc::new(self.clone()) as Arc<dyn FileSource> }
 /// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { 
unimplemented!() }
-/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn 
FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) }
+/// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn 
FileSource> { Arc::new(Self {projected_statistics: Some(statistics), 
schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
 /// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
 /// #  fn statistics(&self) -> datafusion_common::Result<Statistics> { 
Ok(self.projected_statistics.clone().expect("projected_statistics should be 
set")) }
 /// #  fn file_type(&self) -> &str { "parquet" }
+/// #  fn with_schema_adapter_factory(&self, factory: Arc<dyn 
SchemaAdapterFactory>) -> Arc<dyn FileSource> { Arc::new(Self 
{projected_statistics: self.projected_statistics.clone(), 
schema_adapter_factory: Some(factory)} ) }
+/// #  fn schema_adapter_factory(&self) -> Option<Arc<dyn 
SchemaAdapterFactory>> { self.schema_adapter_factory.clone() }
 /// #  }
 /// # impl ParquetSource {
-/// #  fn new() -> Self { Self {projected_statistics: None} }
+/// #  fn new() -> Self { Self {projected_statistics: None, 
schema_adapter_factory: None} }
 /// # }
 /// // create FileScan config for reading parquet files from file://
 /// let object_store_url = ObjectStoreUrl::local_filesystem();
diff --git a/datafusion/datasource/src/macros.rs 
b/datafusion/datasource/src/macros.rs
new file mode 100644
index 0000000000..c7a4058f23
--- /dev/null
+++ b/datafusion/datasource/src/macros.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Macros for the datafusion-datasource crate
+
+/// Helper macro to generate schema adapter methods for FileSource 
implementations
+///
+/// Place this inside *any* `impl FileSource for YourType { … }` to
+/// avoid copy-pasting `with_schema_adapter_factory` and
+/// `schema_adapter_factory`.
+///
+/// # Availability
+///
+/// This macro is exported at the crate root level via `#[macro_export]`, so 
it can be
+/// imported directly from the crate:
+///
+/// ```rust,no_run
+/// use datafusion_datasource::impl_schema_adapter_methods;
+/// ```
+///
+/// # Note on path resolution
+/// When this macro is used:
+/// - `$crate` expands to `datafusion_datasource` (the crate root)
+/// - `$crate::file::FileSource` refers to the FileSource trait from this crate
+/// - `$crate::schema_adapter::SchemaAdapterFactory` refers to the 
SchemaAdapterFactory trait
+///
+/// # Example Usage
+///
+/// ```rust,no_run
+/// use std::sync::Arc;
+/// use std::any::Any;
+/// use std::fmt::{Formatter, Display, self};
+/// use arrow::datatypes::SchemaRef;
+/// use datafusion_common::{Result, Statistics};
+/// use object_store::ObjectStore;
+/// use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+/// use datafusion_physical_plan::DisplayFormatType;
+/// use datafusion_physical_expr_common::sort_expr::LexOrdering;
+/// use datafusion_datasource::file::FileSource;
+/// use datafusion_datasource::file_stream::FileOpener;
+/// use datafusion_datasource::file_scan_config::FileScanConfig;
+/// use datafusion_datasource::impl_schema_adapter_methods;
+/// use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+///
+/// #[derive(Clone)]
+/// struct MyFileSource {
+///     schema: SchemaRef,
+///     batch_size: usize,
+///     statistics: Statistics,
+///     projection: Option<Vec<usize>>,
+///     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+///     metrics: ExecutionPlanMetricsSet,
+/// }
+///
+/// impl FileSource for MyFileSource {
+///     fn create_file_opener(
+///         &self,
+///         object_store: Arc<dyn ObjectStore>,
+///         base_config: &FileScanConfig,
+///         partition: usize,
+///     ) -> Arc<dyn FileOpener> {
+///         // Implementation here
+///         unimplemented!()
+///     }
+///     
+///     fn as_any(&self) -> &dyn Any {
+///         self
+///     }
+///     
+///     fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
+///         let mut new_source = self.clone();
+///         new_source.batch_size = batch_size;
+///         Arc::new(new_source)
+///     }
+///     
+///     fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
+///         let mut new_source = self.clone();
+///         new_source.schema = schema;
+///         Arc::new(new_source)
+///     }
+///     
+///     fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn 
FileSource> {
+///         let mut new_source = self.clone();
+///         new_source.projection = config.file_column_projection_indices();
+///         Arc::new(new_source)
+///     }
+///     
+///     fn with_statistics(&self, statistics: Statistics) -> Arc<dyn 
FileSource> {
+///         let mut new_source = self.clone();
+///         new_source.statistics = statistics;
+///         Arc::new(new_source)
+///     }
+///     
+///     fn metrics(&self) -> &ExecutionPlanMetricsSet {
+///         &self.metrics
+///     }
+///     
+///     fn statistics(&self) -> Result<Statistics> {
+///         Ok(self.statistics.clone())
+///     }
+///     
+///     fn file_type(&self) -> &str {
+///         "my_file_type"
+///     }
+///     
+///     // Use the macro to implement schema adapter methods
+///     impl_schema_adapter_methods!();
+/// }
+/// ```
+#[macro_export(local_inner_macros)]
+macro_rules! impl_schema_adapter_methods {
+    () => {
+        fn with_schema_adapter_factory(
+            &self,
+            schema_adapter_factory: std::sync::Arc<
+                dyn $crate::schema_adapter::SchemaAdapterFactory,
+            >,
+        ) -> std::sync::Arc<dyn $crate::file::FileSource> {
+            std::sync::Arc::new(Self {
+                schema_adapter_factory: Some(schema_adapter_factory),
+                ..self.clone()
+            })
+        }
+
+        fn schema_adapter_factory(
+            &self,
+        ) -> Option<std::sync::Arc<dyn 
$crate::schema_adapter::SchemaAdapterFactory>> {
+            self.schema_adapter_factory.clone()
+        }
+    };
+}
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index 9e83adc6b9..1c27cd4922 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -37,6 +37,7 @@ pub mod file_meta;
 pub mod file_scan_config;
 pub mod file_sink_config;
 pub mod file_stream;
+pub mod macros;
 pub mod memory;
 pub mod schema_adapter;
 pub mod sink;
@@ -48,6 +49,7 @@ pub mod test_util;
 
 pub mod url;
 pub mod write;
+pub use self::file::as_file_source;
 pub use self::url::ListingTableUrl;
 use crate::file_groups::FileGroup;
 use chrono::TimeZone;
diff --git a/datafusion/datasource/src/test_util.rs 
b/datafusion/datasource/src/test_util.rs
index e75c7c3a3d..aac61c7812 100644
--- a/datafusion/datasource/src/test_util.rs
+++ b/datafusion/datasource/src/test_util.rs
@@ -17,6 +17,7 @@
 
 use crate::{
     file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+    impl_schema_adapter_methods, schema_adapter::SchemaAdapterFactory,
 };
 
 use std::sync::Arc;
@@ -32,6 +33,7 @@ use object_store::ObjectStore;
 pub(crate) struct MockSource {
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
 impl FileSource for MockSource {
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
     fn file_type(&self) -> &str {
         "mock"
     }
+
+    impl_schema_adapter_methods!();
 }
 
 /// Create a column expression


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to