This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3236cc04ac Simplify FileSource / SchemaAdapterFactory API (#16214)
3236cc04ac is described below

commit 3236cc04ac5c05e37e0fc51ea3c22c8a543ff332
Author: Andrew Lamb <and...@nerdnetworks.org>
AuthorDate: Tue Jun 3 22:10:00 2025 -0400

    Simplify FileSource / SchemaAdapterFactory API (#16214)
    
    * Make FileFormat::with_schema_adapter_factory fallible, remove macros
    
    * Remove standalone integration test
    
    * Update doc test
---
 datafusion/core/src/datasource/mod.rs              |   3 +-
 .../src/datasource/physical_plan/arrow_file.rs     |  16 +-
 .../schema_adapter_integration_tests.rs            | 185 ++++++++++++++++++
 .../physical_optimizer/filter_pushdown/util.rs     |  19 +-
 datafusion/core/tests/test_adapter_updated.rs      | 214 ---------------------
 datafusion/datasource-avro/src/source.rs           |  15 +-
 datafusion/datasource-csv/src/source.rs            |  17 +-
 datafusion/datasource-json/src/source.rs           |  18 +-
 datafusion/datasource-parquet/src/file_format.rs   |   2 +-
 datafusion/datasource-parquet/src/source.rs        |  23 ++-
 .../tests/apply_schema_adapter_tests.rs            |   6 +-
 datafusion/datasource/src/file.rs                  |  27 ++-
 datafusion/datasource/src/file_scan_config.rs      |   5 +-
 datafusion/datasource/src/macros.rs                | 145 --------------
 datafusion/datasource/src/mod.rs                   |   1 -
 datafusion/datasource/src/test_util.rs             |  16 +-
 16 files changed, 315 insertions(+), 397 deletions(-)

diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index f0c6771515..b3d69064ff 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -122,7 +122,8 @@ mod tests {
 
         let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
         let source = ParquetSource::default()
-            .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory 
{}));
+            .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
+            .unwrap();
         let base_conf = FileScanConfigBuilder::new(
             ObjectStoreUrl::local_filesystem(),
             schema,
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs 
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 6de72aa8ff..5728746e90 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -20,8 +20,8 @@ use std::sync::Arc;
 
 use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
 use crate::error::Result;
+use datafusion_datasource::as_file_source;
 use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
-use datafusion_datasource::{as_file_source, impl_schema_adapter_methods};
 
 use arrow::buffer::Buffer;
 use arrow::datatypes::SchemaRef;
@@ -99,7 +99,19 @@ impl FileSource for ArrowSource {
         "arrow"
     }
 
-    impl_schema_adapter_methods!();
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self.clone()
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
 }
 
 /// The struct arrow that implements `[FileOpener]` trait
diff --git 
a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs 
b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs
index 38c2ee582a..833af04680 100644
--- 
a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs
+++ 
b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs
@@ -17,6 +17,7 @@
 
 //! Integration test for schema adapter factory functionality
 
+use std::any::Any;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -258,3 +259,187 @@ fn test_schema_adapter_preservation() {
     // Verify the schema adapter factory is present in the file source
     assert!(config.source().schema_adapter_factory().is_some());
 }
+
+
+/// A test source for testing schema adapters
+#[derive(Debug, Clone)]
+struct TestSource {
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl TestSource {
+    fn new() -> Self {
+        Self {
+            schema_adapter_factory: None,
+        }
+    }
+}
+
+impl FileSource for TestSource {
+    fn file_type(&self) -> &str {
+        "test"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn create_file_opener(
+        &self,
+        _store: Arc<dyn ObjectStore>,
+        _conf: &FileScanConfig,
+        _index: usize,
+    ) -> Arc<dyn FileOpener> {
+        unimplemented!("Not needed for this test")
+    }
+
+    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn with_projection(&self, _projection: &FileScanConfig) -> Arc<dyn 
FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn with_statistics(&self, _statistics: Statistics) -> Arc<dyn FileSource> {
+        Arc::new(self.clone())
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        unimplemented!("Not needed for this test")
+    }
+
+    fn statistics(&self) -> Result<Statistics, DataFusionError> {
+        Ok(Statistics::default())
+    }
+
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
+}
+
+/// A test schema adapter factory
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        _table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(TestSchemaAdapter {
+            table_schema: projected_table_schema,
+        })
+    }
+}
+
+/// A test schema adapter implementation
+#[derive(Debug)]
+struct TestSchemaAdapter {
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field = self.table_schema.field(index);
+        file_schema.fields.find(field.name()).map(|(i, _)| i)
+    }
+
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+        for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
+            if self.table_schema.fields().find(file_field.name()).is_some() {
+                projection.push(file_idx);
+            }
+        }
+
+        Ok((Arc::new(TestSchemaMapping {}), projection))
+    }
+}
+
+/// A test schema mapper implementation
+#[derive(Debug)]
+struct TestSchemaMapping {}
+
+impl SchemaMapper for TestSchemaMapping {
+    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        // For testing, just return the original batch
+        Ok(batch)
+    }
+
+    fn map_column_statistics(
+        &self,
+        stats: &[ColumnStatistics],
+    ) -> Result<Vec<ColumnStatistics>> {
+        // For testing, just return the input statistics
+        Ok(stats.to_vec())
+    }
+}
+
+#[test]
+fn test_schema_adapter() {
+    // This test verifies the functionality of the SchemaAdapter and 
SchemaAdapterFactory
+    // components used in DataFusion's file sources.
+    //
+    // The test specifically checks:
+    // 1. Creating and attaching a schema adapter factory to a file source
+    // 2. Creating a schema adapter using the factory
+    // 3. The schema adapter's ability to map column indices between a table 
schema and a file schema
+    // 4. The schema adapter's ability to create a projection that selects 
only the columns
+    //    from the file schema that are present in the table schema
+    //
+    // Schema adapters are used when the schema of data in files doesn't 
exactly match
+    // the schema expected by the query engine, allowing for field mapping and 
data transformation.
+
+    // Create a test schema
+    let table_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+    ]));
+
+    // Create a file schema
+    let file_schema = Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+        Field::new("extra", DataType::Int64, true),
+    ]);
+
+    // Create a TestSource
+    let source = TestSource::new();
+    assert!(source.schema_adapter_factory().is_none());
+
+    // Add a schema adapter factory
+    let factory = Arc::new(TestSchemaAdapterFactory {});
+    let source_with_adapter = 
source.with_schema_adapter_factory(factory).unwrap();
+    assert!(source_with_adapter.schema_adapter_factory().is_some());
+
+    // Create a schema adapter
+    let adapter_factory = 
source_with_adapter.schema_adapter_factory().unwrap();
+    let adapter =
+        adapter_factory.create(Arc::clone(&table_schema), 
Arc::clone(&table_schema));
+
+    // Test mapping column index
+    assert_eq!(adapter.map_column_index(0, &file_schema), Some(0));
+    assert_eq!(adapter.map_column_index(1, &file_schema), Some(1));
+
+    // Test creating schema mapper
+    let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap();
+    assert_eq!(projection, vec![0, 1]);
+}
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index dc4d77194c..87fa70c07a 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -23,9 +23,8 @@ use datafusion_common::{config::ConfigOptions, internal_err, 
Result, Statistics}
 use datafusion_datasource::{
     file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig,
     file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
-    file_stream::FileOpener, impl_schema_adapter_methods,
-    schema_adapter::DefaultSchemaAdapterFactory, 
schema_adapter::SchemaAdapterFactory,
-    source::DataSourceExec, PartitionedFile,
+    file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
+    schema_adapter::SchemaAdapterFactory, source::DataSourceExec, 
PartitionedFile,
 };
 use datafusion_physical_expr::conjunction;
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -232,7 +231,19 @@ impl FileSource for TestSource {
         }
     }
 
-    impl_schema_adapter_methods!();
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self.clone()
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
 }
 
 #[derive(Debug, Clone)]
diff --git a/datafusion/core/tests/test_adapter_updated.rs 
b/datafusion/core/tests/test_adapter_updated.rs
deleted file mode 100644
index c85b9a3447..0000000000
--- a/datafusion/core/tests/test_adapter_updated.rs
+++ /dev/null
@@ -1,214 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use arrow::record_batch::RecordBatch;
-use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics};
-use datafusion_datasource::file::FileSource;
-use datafusion_datasource::file_scan_config::FileScanConfig;
-use datafusion_datasource::file_stream::FileOpener;
-use datafusion_datasource::schema_adapter::{
-    SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
-};
-use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
-use object_store::ObjectStore;
-use std::any::Any;
-use std::fmt::Debug;
-use std::sync::Arc;
-
-/// A test source for testing schema adapters
-#[derive(Debug, Clone)]
-struct TestSource {
-    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
-}
-
-impl TestSource {
-    fn new() -> Self {
-        Self {
-            schema_adapter_factory: None,
-        }
-    }
-}
-
-impl FileSource for TestSource {
-    fn file_type(&self) -> &str {
-        "test"
-    }
-
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn create_file_opener(
-        &self,
-        _store: Arc<dyn ObjectStore>,
-        _conf: &FileScanConfig,
-        _index: usize,
-    ) -> Arc<dyn FileOpener> {
-        unimplemented!("Not needed for this test")
-    }
-
-    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
-        Arc::new(self.clone())
-    }
-
-    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
-        Arc::new(self.clone())
-    }
-
-    fn with_projection(&self, _projection: &FileScanConfig) -> Arc<dyn 
FileSource> {
-        Arc::new(self.clone())
-    }
-
-    fn with_statistics(&self, _statistics: Statistics) -> Arc<dyn FileSource> {
-        Arc::new(self.clone())
-    }
-
-    fn metrics(&self) -> &ExecutionPlanMetricsSet {
-        unimplemented!("Not needed for this test")
-    }
-
-    fn statistics(&self) -> Result<Statistics, DataFusionError> {
-        Ok(Statistics::default())
-    }
-
-    fn with_schema_adapter_factory(
-        &self,
-        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
-    ) -> Arc<dyn FileSource> {
-        Arc::new(Self {
-            schema_adapter_factory: Some(schema_adapter_factory),
-        })
-    }
-
-    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
-        self.schema_adapter_factory.clone()
-    }
-}
-
-/// A test schema adapter factory
-#[derive(Debug)]
-struct TestSchemaAdapterFactory {}
-
-impl SchemaAdapterFactory for TestSchemaAdapterFactory {
-    fn create(
-        &self,
-        projected_table_schema: SchemaRef,
-        _table_schema: SchemaRef,
-    ) -> Box<dyn SchemaAdapter> {
-        Box::new(TestSchemaAdapter {
-            table_schema: projected_table_schema,
-        })
-    }
-}
-
-/// A test schema adapter implementation
-#[derive(Debug)]
-struct TestSchemaAdapter {
-    table_schema: SchemaRef,
-}
-
-impl SchemaAdapter for TestSchemaAdapter {
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        let field = self.table_schema.field(index);
-        file_schema.fields.find(field.name()).map(|(i, _)| i)
-    }
-
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let mut projection = Vec::with_capacity(file_schema.fields().len());
-        for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
-            if self.table_schema.fields().find(file_field.name()).is_some() {
-                projection.push(file_idx);
-            }
-        }
-
-        Ok((Arc::new(TestSchemaMapping {}), projection))
-    }
-}
-
-/// A test schema mapper implementation
-#[derive(Debug)]
-struct TestSchemaMapping {}
-
-impl SchemaMapper for TestSchemaMapping {
-    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-        // For testing, just return the original batch
-        Ok(batch)
-    }
-
-    fn map_column_statistics(
-        &self,
-        stats: &[ColumnStatistics],
-    ) -> Result<Vec<ColumnStatistics>> {
-        // For testing, just return the input statistics
-        Ok(stats.to_vec())
-    }
-}
-
-#[test]
-fn test_schema_adapter() {
-    // This test verifies the functionality of the SchemaAdapter and 
SchemaAdapterFactory
-    // components used in DataFusion's file sources.
-    //
-    // The test specifically checks:
-    // 1. Creating and attaching a schema adapter factory to a file source
-    // 2. Creating a schema adapter using the factory
-    // 3. The schema adapter's ability to map column indices between a table 
schema and a file schema
-    // 4. The schema adapter's ability to create a projection that selects 
only the columns
-    //    from the file schema that are present in the table schema
-    //
-    // Schema adapters are used when the schema of data in files doesn't 
exactly match
-    // the schema expected by the query engine, allowing for field mapping and 
data transformation.
-
-    // Create a test schema
-    let table_schema = Arc::new(Schema::new(vec![
-        Field::new("id", DataType::Int32, false),
-        Field::new("name", DataType::Utf8, true),
-    ]));
-
-    // Create a file schema
-    let file_schema = Schema::new(vec![
-        Field::new("id", DataType::Int32, false),
-        Field::new("name", DataType::Utf8, true),
-        Field::new("extra", DataType::Int64, true),
-    ]);
-
-    // Create a TestSource
-    let source = TestSource::new();
-    assert!(source.schema_adapter_factory().is_none());
-
-    // Add a schema adapter factory
-    let factory = Arc::new(TestSchemaAdapterFactory {});
-    let source_with_adapter = source.with_schema_adapter_factory(factory);
-    assert!(source_with_adapter.schema_adapter_factory().is_some());
-
-    // Create a schema adapter
-    let adapter_factory = 
source_with_adapter.schema_adapter_factory().unwrap();
-    let adapter =
-        adapter_factory.create(Arc::clone(&table_schema), 
Arc::clone(&table_schema));
-
-    // Test mapping column index
-    assert_eq!(adapter.map_column_index(0, &file_schema), Some(0));
-    assert_eq!(adapter.map_column_index(1, &file_schema), Some(1));
-
-    // Test creating schema mapper
-    let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap();
-    assert_eq!(projection, vec![0, 1]);
-}
diff --git a/datafusion/datasource-avro/src/source.rs 
b/datafusion/datasource-avro/src/source.rs
index 2fdf34b3cc..3254f48bab 100644
--- a/datafusion/datasource-avro/src/source.rs
+++ b/datafusion/datasource-avro/src/source.rs
@@ -28,7 +28,6 @@ use datafusion_common::Statistics;
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfig;
 use datafusion_datasource::file_stream::FileOpener;
-use datafusion_datasource::impl_schema_adapter_methods;
 use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -127,7 +126,19 @@ impl FileSource for AvroSource {
         Ok(None)
     }
 
-    impl_schema_adapter_methods!();
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self.clone()
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
 }
 
 mod private {
diff --git a/datafusion/datasource-csv/src/source.rs 
b/datafusion/datasource-csv/src/source.rs
index d45080dc20..3af1f2b345 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -29,8 +29,7 @@ use 
datafusion_datasource::file_compression_type::FileCompressionType;
 use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
 use datafusion_datasource::{
-    as_file_source, calculate_range, impl_schema_adapter_methods, FileRange,
-    ListingTableUrl, RangeCalculation,
+    as_file_source, calculate_range, FileRange, ListingTableUrl, 
RangeCalculation,
 };
 
 use arrow::csv;
@@ -284,7 +283,19 @@ impl FileSource for CsvSource {
         }
     }
 
-    impl_schema_adapter_methods!();
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self.clone()
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
 }
 
 impl FileOpener for CsvOpener {
diff --git a/datafusion/datasource-json/src/source.rs 
b/datafusion/datasource-json/src/source.rs
index 187876522e..af37e1033e 100644
--- a/datafusion/datasource-json/src/source.rs
+++ b/datafusion/datasource-json/src/source.rs
@@ -32,8 +32,7 @@ use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
 use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use datafusion_datasource::{
-    as_file_source, calculate_range, impl_schema_adapter_methods, 
ListingTableUrl,
-    RangeCalculation,
+    as_file_source, calculate_range, ListingTableUrl, RangeCalculation,
 };
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
@@ -151,7 +150,20 @@ impl FileSource for JsonSource {
     fn file_type(&self) -> &str {
         "json"
     }
-    impl_schema_adapter_methods!();
+
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self.clone()
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
 }
 
 impl FileOpener for JsonOpener {
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 851e336443..647fbc8d05 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -423,7 +423,7 @@ impl FileFormat for ParquetFormat {
             source = source.with_metadata_size_hint(metadata_size_hint)
         }
         // Apply schema adapter factory before building the new config
-        let file_source = source.apply_schema_adapter(&conf);
+        let file_source = source.apply_schema_adapter(&conf)?;
 
         let conf = FileScanConfigBuilder::from(conf)
             .with_source(file_source)
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 30b774d08f..0412288d68 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -29,7 +29,6 @@ use crate::ParquetFileReaderFactory;
 use datafusion_common::config::ConfigOptions;
 use datafusion_datasource::as_file_source;
 use datafusion_datasource::file_stream::FileOpener;
-use datafusion_datasource::impl_schema_adapter_methods;
 use datafusion_datasource::schema_adapter::{
     DefaultSchemaAdapterFactory, SchemaAdapterFactory,
 };
@@ -415,7 +414,10 @@ impl ParquetSource {
     /// * `conf` - FileScanConfig that may contain a schema adapter factory
     /// # Returns
     /// The converted FileSource with schema adapter factory applied if 
provided
-    pub fn apply_schema_adapter(self, conf: &FileScanConfig) -> Arc<dyn 
FileSource> {
+    pub fn apply_schema_adapter(
+        self,
+        conf: &FileScanConfig,
+    ) -> datafusion_common::Result<Arc<dyn FileSource>> {
         let file_source: Arc<dyn FileSource> = self.into();
 
         // If the FileScanConfig.file_source() has a schema adapter factory, 
apply it
@@ -424,7 +426,7 @@ impl ParquetSource {
                 Arc::<dyn SchemaAdapterFactory>::clone(&factory),
             )
         } else {
-            file_source
+            Ok(file_source)
         }
     }
 }
@@ -664,5 +666,18 @@ impl FileSource for ParquetSource {
         );
         
Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source))
     }
-    impl_schema_adapter_methods!();
+
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> datafusion_common::Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self.clone()
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
 }
diff --git a/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs 
b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs
index 89406fb742..955cd224e6 100644
--- a/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs
+++ b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs
@@ -141,7 +141,7 @@ mod parquet_adapter_tests {
             prefix: "test_".to_string(),
         });
 
-        let file_source = source.clone().with_schema_adapter_factory(factory);
+        let file_source = 
source.clone().with_schema_adapter_factory(factory).unwrap();
 
         let config = FileScanConfigBuilder::new(
             ObjectStoreUrl::local_filesystem(),
@@ -151,7 +151,7 @@ mod parquet_adapter_tests {
         .build();
 
         // Apply schema adapter to a new source
-        let result_source = source.apply_schema_adapter(&config);
+        let result_source = source.apply_schema_adapter(&config).unwrap();
 
         // Verify the adapter was applied
         assert!(result_source.schema_adapter_factory().is_some());
@@ -198,7 +198,7 @@ mod parquet_adapter_tests {
         .build();
 
         // Apply schema adapter function - should pass through the source 
unchanged
-        let result_source = source.apply_schema_adapter(&config);
+        let result_source = source.apply_schema_adapter(&config).unwrap();
 
         // Verify no adapter was applied
         assert!(result_source.schema_adapter_factory().is_none());
diff --git a/datafusion/datasource/src/file.rs 
b/datafusion/datasource/src/file.rs
index d0557e9f08..c5f21ebf1a 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -28,7 +28,7 @@ use crate::file_stream::FileOpener;
 use crate::schema_adapter::SchemaAdapterFactory;
 use arrow::datatypes::SchemaRef;
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::{Result, Statistics};
+use datafusion_common::{not_impl_err, Result, Statistics};
 use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
 use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -126,19 +126,26 @@ pub trait FileSource: Send + Sync {
     /// Set optional schema adapter factory.
     ///
     /// [`SchemaAdapterFactory`] allows user to specify how fields from the
-    /// file get mapped to that of the table schema. The default implementation
-    /// returns the original source.
+    /// file get mapped to that of the table schema.  If you implement this
+    /// method, you should also implement [`schema_adapter_factory`].
     ///
-    /// Note: You can implement this method and `schema_adapter_factory`
-    /// automatically using the [`crate::impl_schema_adapter_methods`] macro.
+    /// The default implementation returns a not implemented error.
+    ///
+    /// [`schema_adapter_factory`]: Self::schema_adapter_factory
     fn with_schema_adapter_factory(
         &self,
-        factory: Arc<dyn SchemaAdapterFactory>,
-    ) -> Arc<dyn FileSource>;
+        _factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        not_impl_err!(
+            "FileSource {} does not support schema adapter factory",
+            self.file_type()
+        )
+    }
 
     /// Returns the current schema adapter factory if set
     ///
-    /// Note: You can implement this method and `with_schema_adapter_factory`
-    /// automatically using the [`crate::impl_schema_adapter_methods`] macro.
-    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>;
+    /// Default implementation returns `None`.
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        None
+    }
 }
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index bbc5efb4d0..b6cdd05e52 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -76,6 +76,7 @@ use log::{debug, warn};
 /// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
 /// # use object_store::ObjectStore;
 /// # use datafusion_common::Statistics;
+/// # use datafusion_common::Result;
 /// # use datafusion_datasource::file::FileSource;
 /// # use datafusion_datasource::file_groups::FileGroup;
 /// # use datafusion_datasource::PartitionedFile;
@@ -106,9 +107,9 @@ use log::{debug, warn};
 /// #  fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { 
unimplemented!() }
 /// #  fn with_statistics(&self, statistics: Statistics) -> Arc<dyn 
FileSource> { Arc::new(Self {projected_statistics: Some(statistics), 
schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
 /// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
-/// #  fn statistics(&self) -> datafusion_common::Result<Statistics> { 
Ok(self.projected_statistics.clone().expect("projected_statistics should be 
set")) }
+/// #  fn statistics(&self) -> Result<Statistics> { 
Ok(self.projected_statistics.clone().expect("projected_statistics should be 
set")) }
 /// #  fn file_type(&self) -> &str { "parquet" }
-/// #  fn with_schema_adapter_factory(&self, factory: Arc<dyn 
SchemaAdapterFactory>) -> Arc<dyn FileSource> { Arc::new(Self 
{projected_statistics: self.projected_statistics.clone(), 
schema_adapter_factory: Some(factory)} ) }
+/// #  fn with_schema_adapter_factory(&self, factory: Arc<dyn 
SchemaAdapterFactory>) -> Result<Arc<dyn FileSource>> { Ok(Arc::new(Self 
{projected_statistics: self.projected_statistics.clone(), 
schema_adapter_factory: Some(factory)} )) }
 /// #  fn schema_adapter_factory(&self) -> Option<Arc<dyn 
SchemaAdapterFactory>> { self.schema_adapter_factory.clone() }
 /// #  }
 /// # impl ParquetSource {
diff --git a/datafusion/datasource/src/macros.rs 
b/datafusion/datasource/src/macros.rs
deleted file mode 100644
index c7a4058f23..0000000000
--- a/datafusion/datasource/src/macros.rs
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Macros for the datafusion-datasource crate
-
-/// Helper macro to generate schema adapter methods for FileSource 
implementations
-///
-/// Place this inside *any* `impl FileSource for YourType { … }` to
-/// avoid copy-pasting `with_schema_adapter_factory` and
-/// `schema_adapter_factory`.
-///
-/// # Availability
-///
-/// This macro is exported at the crate root level via `#[macro_export]`, so 
it can be
-/// imported directly from the crate:
-///
-/// ```rust,no_run
-/// use datafusion_datasource::impl_schema_adapter_methods;
-/// ```
-///
-/// # Note on path resolution
-/// When this macro is used:
-/// - `$crate` expands to `datafusion_datasource` (the crate root)
-/// - `$crate::file::FileSource` refers to the FileSource trait from this crate
-/// - `$crate::schema_adapter::SchemaAdapterFactory` refers to the 
SchemaAdapterFactory trait
-///
-/// # Example Usage
-///
-/// ```rust,no_run
-/// use std::sync::Arc;
-/// use std::any::Any;
-/// use std::fmt::{Formatter, Display, self};
-/// use arrow::datatypes::SchemaRef;
-/// use datafusion_common::{Result, Statistics};
-/// use object_store::ObjectStore;
-/// use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
-/// use datafusion_physical_plan::DisplayFormatType;
-/// use datafusion_physical_expr_common::sort_expr::LexOrdering;
-/// use datafusion_datasource::file::FileSource;
-/// use datafusion_datasource::file_stream::FileOpener;
-/// use datafusion_datasource::file_scan_config::FileScanConfig;
-/// use datafusion_datasource::impl_schema_adapter_methods;
-/// use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
-///
-/// #[derive(Clone)]
-/// struct MyFileSource {
-///     schema: SchemaRef,
-///     batch_size: usize,
-///     statistics: Statistics,
-///     projection: Option<Vec<usize>>,
-///     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
-///     metrics: ExecutionPlanMetricsSet,
-/// }
-///
-/// impl FileSource for MyFileSource {
-///     fn create_file_opener(
-///         &self,
-///         object_store: Arc<dyn ObjectStore>,
-///         base_config: &FileScanConfig,
-///         partition: usize,
-///     ) -> Arc<dyn FileOpener> {
-///         // Implementation here
-///         unimplemented!()
-///     }
-///     
-///     fn as_any(&self) -> &dyn Any {
-///         self
-///     }
-///     
-///     fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
-///         let mut new_source = self.clone();
-///         new_source.batch_size = batch_size;
-///         Arc::new(new_source)
-///     }
-///     
-///     fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
-///         let mut new_source = self.clone();
-///         new_source.schema = schema;
-///         Arc::new(new_source)
-///     }
-///     
-///     fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn 
FileSource> {
-///         let mut new_source = self.clone();
-///         new_source.projection = config.file_column_projection_indices();
-///         Arc::new(new_source)
-///     }
-///     
-///     fn with_statistics(&self, statistics: Statistics) -> Arc<dyn 
FileSource> {
-///         let mut new_source = self.clone();
-///         new_source.statistics = statistics;
-///         Arc::new(new_source)
-///     }
-///     
-///     fn metrics(&self) -> &ExecutionPlanMetricsSet {
-///         &self.metrics
-///     }
-///     
-///     fn statistics(&self) -> Result<Statistics> {
-///         Ok(self.statistics.clone())
-///     }
-///     
-///     fn file_type(&self) -> &str {
-///         "my_file_type"
-///     }
-///     
-///     // Use the macro to implement schema adapter methods
-///     impl_schema_adapter_methods!();
-/// }
-/// ```
-#[macro_export(local_inner_macros)]
-macro_rules! impl_schema_adapter_methods {
-    () => {
-        fn with_schema_adapter_factory(
-            &self,
-            schema_adapter_factory: std::sync::Arc<
-                dyn $crate::schema_adapter::SchemaAdapterFactory,
-            >,
-        ) -> std::sync::Arc<dyn $crate::file::FileSource> {
-            std::sync::Arc::new(Self {
-                schema_adapter_factory: Some(schema_adapter_factory),
-                ..self.clone()
-            })
-        }
-
-        fn schema_adapter_factory(
-            &self,
-        ) -> Option<std::sync::Arc<dyn 
$crate::schema_adapter::SchemaAdapterFactory>> {
-            self.schema_adapter_factory.clone()
-        }
-    };
-}
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index 1c27cd4922..c79efd11fc 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -37,7 +37,6 @@ pub mod file_meta;
 pub mod file_scan_config;
 pub mod file_sink_config;
 pub mod file_stream;
-pub mod macros;
 pub mod memory;
 pub mod schema_adapter;
 pub mod sink;
diff --git a/datafusion/datasource/src/test_util.rs 
b/datafusion/datasource/src/test_util.rs
index aac61c7812..e4a5114aa0 100644
--- a/datafusion/datasource/src/test_util.rs
+++ b/datafusion/datasource/src/test_util.rs
@@ -17,7 +17,7 @@
 
 use crate::{
     file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
-    impl_schema_adapter_methods, schema_adapter::SchemaAdapterFactory,
+    schema_adapter::SchemaAdapterFactory,
 };
 
 use std::sync::Arc;
@@ -84,7 +84,19 @@ impl FileSource for MockSource {
         "mock"
     }
 
-    impl_schema_adapter_methods!();
+    fn with_schema_adapter_factory(
+        &self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Result<Arc<dyn FileSource>> {
+        Ok(Arc::new(Self {
+            schema_adapter_factory: Some(schema_adapter_factory),
+            ..self.clone()
+        }))
+    }
+
+    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
+        self.schema_adapter_factory.clone()
+    }
 }
 
 /// Create a column expression


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to