This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 3236cc04ac Simplify FileSource / SchemaAdapterFactory API (#16214) 3236cc04ac is described below commit 3236cc04ac5c05e37e0fc51ea3c22c8a543ff332 Author: Andrew Lamb <and...@nerdnetworks.org> AuthorDate: Tue Jun 3 22:10:00 2025 -0400 Simplify FileSource / SchemaAdapterFactory API (#16214) * Make FileFormat::with_schema_adapter_factory fallible, remove macros * Remove standalone integration test * Update doc test --- datafusion/core/src/datasource/mod.rs | 3 +- .../src/datasource/physical_plan/arrow_file.rs | 16 +- .../schema_adapter_integration_tests.rs | 185 ++++++++++++++++++ .../physical_optimizer/filter_pushdown/util.rs | 19 +- datafusion/core/tests/test_adapter_updated.rs | 214 --------------------- datafusion/datasource-avro/src/source.rs | 15 +- datafusion/datasource-csv/src/source.rs | 17 +- datafusion/datasource-json/src/source.rs | 18 +- datafusion/datasource-parquet/src/file_format.rs | 2 +- datafusion/datasource-parquet/src/source.rs | 23 ++- .../tests/apply_schema_adapter_tests.rs | 6 +- datafusion/datasource/src/file.rs | 27 ++- datafusion/datasource/src/file_scan_config.rs | 5 +- datafusion/datasource/src/macros.rs | 145 -------------- datafusion/datasource/src/mod.rs | 1 - datafusion/datasource/src/test_util.rs | 16 +- 16 files changed, 315 insertions(+), 397 deletions(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index f0c6771515..b3d69064ff 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -122,7 +122,8 @@ mod tests { let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); let source = ParquetSource::default() - .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); + .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})) + .unwrap(); let base_conf = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), schema, diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 6de72aa8ff..5728746e90 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use crate::error::Result; +use datafusion_datasource::as_file_source; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -use datafusion_datasource::{as_file_source, impl_schema_adapter_methods}; use arrow::buffer::Buffer; use arrow::datatypes::SchemaRef; @@ -99,7 +99,19 @@ impl FileSource for ArrowSource { "arrow" } - impl_schema_adapter_methods!(); + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } } /// The struct arrow that implements `[FileOpener]` trait diff --git a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs index 38c2ee582a..833af04680 100644 --- a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs @@ -17,6 +17,7 @@ //! Integration test for schema adapter factory functionality +use std::any::Any; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -258,3 +259,187 @@ fn test_schema_adapter_preservation() { // Verify the schema adapter factory is present in the file source assert!(config.source().schema_adapter_factory().is_some()); } + + +/// A test source for testing schema adapters +#[derive(Debug, Clone)] +struct TestSource { + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, +} + +impl TestSource { + fn new() -> Self { + Self { + schema_adapter_factory: None, + } + } +} + +impl FileSource for TestSource { + fn file_type(&self) -> &str { + "test" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn create_file_opener( + &self, + _store: Arc<dyn ObjectStore>, + _conf: &FileScanConfig, + _index: usize, + ) -> Arc<dyn FileOpener> { + unimplemented!("Not needed for this test") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn with_projection(&self, _projection: &FileScanConfig) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn with_statistics(&self, _statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + unimplemented!("Not needed for this test") + } + + fn statistics(&self) -> Result<Statistics, DataFusionError> { + Ok(Statistics::default()) + } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } +} + +/// A test schema adapter factory +#[derive(Debug)] +struct TestSchemaAdapterFactory {} + +impl SchemaAdapterFactory for TestSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box<dyn SchemaAdapter> { + Box::new(TestSchemaAdapter { + table_schema: projected_table_schema, + }) + } +} + +/// A test schema adapter implementation +#[derive(Debug)] +struct TestSchemaAdapter { + table_schema: SchemaRef, +} + +impl SchemaAdapter for TestSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> { + let field = self.table_schema.field(index); + file_schema.fields.find(field.name()).map(|(i, _)| i) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + for (file_idx, file_field) in file_schema.fields().iter().enumerate() { + if self.table_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + Ok((Arc::new(TestSchemaMapping {}), projection)) + } +} + +/// A test schema mapper implementation +#[derive(Debug)] +struct TestSchemaMapping {} + +impl SchemaMapper for TestSchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> { + // For testing, just return the original batch + Ok(batch) + } + + fn map_column_statistics( + &self, + stats: &[ColumnStatistics], + ) -> Result<Vec<ColumnStatistics>> { + // For testing, just return the input statistics + Ok(stats.to_vec()) + } +} + +#[test] +fn test_schema_adapter() { + // This test verifies the functionality of the SchemaAdapter and SchemaAdapterFactory + // components used in DataFusion's file sources. + // + // The test specifically checks: + // 1. Creating and attaching a schema adapter factory to a file source + // 2. Creating a schema adapter using the factory + // 3. The schema adapter's ability to map column indices between a table schema and a file schema + // 4. The schema adapter's ability to create a projection that selects only the columns + // from the file schema that are present in the table schema + // + // Schema adapters are used when the schema of data in files doesn't exactly match + // the schema expected by the query engine, allowing for field mapping and data transformation. + + // Create a test schema + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + // Create a file schema + let file_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + Field::new("extra", DataType::Int64, true), + ]); + + // Create a TestSource + let source = TestSource::new(); + assert!(source.schema_adapter_factory().is_none()); + + // Add a schema adapter factory + let factory = Arc::new(TestSchemaAdapterFactory {}); + let source_with_adapter = source.with_schema_adapter_factory(factory).unwrap(); + assert!(source_with_adapter.schema_adapter_factory().is_some()); + + // Create a schema adapter + let adapter_factory = source_with_adapter.schema_adapter_factory().unwrap(); + let adapter = + adapter_factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)); + + // Test mapping column index + assert_eq!(adapter.map_column_index(0, &file_schema), Some(0)); + assert_eq!(adapter.map_column_index(1, &file_schema), Some(1)); + + // Test creating schema mapper + let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap(); + assert_eq!(projection, vec![0, 1]); +} diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index dc4d77194c..87fa70c07a 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -23,9 +23,8 @@ use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics} use datafusion_datasource::{ file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig, file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, - file_stream::FileOpener, impl_schema_adapter_methods, - schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory, - source::DataSourceExec, PartitionedFile, + file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, + schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile, }; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -232,7 +231,19 @@ impl FileSource for TestSource { } } - impl_schema_adapter_methods!(); + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } } #[derive(Debug, Clone)] diff --git a/datafusion/core/tests/test_adapter_updated.rs b/datafusion/core/tests/test_adapter_updated.rs deleted file mode 100644 index c85b9a3447..0000000000 --- a/datafusion/core/tests/test_adapter_updated.rs +++ /dev/null @@ -1,214 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics}; -use datafusion_datasource::file::FileSource; -use datafusion_datasource::file_scan_config::FileScanConfig; -use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::{ - SchemaAdapter, SchemaAdapterFactory, SchemaMapper, -}; -use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -use object_store::ObjectStore; -use std::any::Any; -use std::fmt::Debug; -use std::sync::Arc; - -/// A test source for testing schema adapters -#[derive(Debug, Clone)] -struct TestSource { - schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, -} - -impl TestSource { - fn new() -> Self { - Self { - schema_adapter_factory: None, - } - } -} - -impl FileSource for TestSource { - fn file_type(&self) -> &str { - "test" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn create_file_opener( - &self, - _store: Arc<dyn ObjectStore>, - _conf: &FileScanConfig, - _index: usize, - ) -> Arc<dyn FileOpener> { - unimplemented!("Not needed for this test") - } - - fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { - Arc::new(self.clone()) - } - - fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { - Arc::new(self.clone()) - } - - fn with_projection(&self, _projection: &FileScanConfig) -> Arc<dyn FileSource> { - Arc::new(self.clone()) - } - - fn with_statistics(&self, _statistics: Statistics) -> Arc<dyn FileSource> { - Arc::new(self.clone()) - } - - fn metrics(&self) -> &ExecutionPlanMetricsSet { - unimplemented!("Not needed for this test") - } - - fn statistics(&self) -> Result<Statistics, DataFusionError> { - Ok(Statistics::default()) - } - - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, - ) -> Arc<dyn FileSource> { - Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - }) - } - - fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { - self.schema_adapter_factory.clone() - } -} - -/// A test schema adapter factory -#[derive(Debug)] -struct TestSchemaAdapterFactory {} - -impl SchemaAdapterFactory for TestSchemaAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box<dyn SchemaAdapter> { - Box::new(TestSchemaAdapter { - table_schema: projected_table_schema, - }) - } -} - -/// A test schema adapter implementation -#[derive(Debug)] -struct TestSchemaAdapter { - table_schema: SchemaRef, -} - -impl SchemaAdapter for TestSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> { - let field = self.table_schema.field(index); - file_schema.fields.find(field.name()).map(|(i, _)| i) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - for (file_idx, file_field) in file_schema.fields().iter().enumerate() { - if self.table_schema.fields().find(file_field.name()).is_some() { - projection.push(file_idx); - } - } - - Ok((Arc::new(TestSchemaMapping {}), projection)) - } -} - -/// A test schema mapper implementation -#[derive(Debug)] -struct TestSchemaMapping {} - -impl SchemaMapper for TestSchemaMapping { - fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> { - // For testing, just return the original batch - Ok(batch) - } - - fn map_column_statistics( - &self, - stats: &[ColumnStatistics], - ) -> Result<Vec<ColumnStatistics>> { - // For testing, just return the input statistics - Ok(stats.to_vec()) - } -} - -#[test] -fn test_schema_adapter() { - // This test verifies the functionality of the SchemaAdapter and SchemaAdapterFactory - // components used in DataFusion's file sources. - // - // The test specifically checks: - // 1. Creating and attaching a schema adapter factory to a file source - // 2. Creating a schema adapter using the factory - // 3. The schema adapter's ability to map column indices between a table schema and a file schema - // 4. The schema adapter's ability to create a projection that selects only the columns - // from the file schema that are present in the table schema - // - // Schema adapters are used when the schema of data in files doesn't exactly match - // the schema expected by the query engine, allowing for field mapping and data transformation. - - // Create a test schema - let table_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])); - - // Create a file schema - let file_schema = Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - Field::new("extra", DataType::Int64, true), - ]); - - // Create a TestSource - let source = TestSource::new(); - assert!(source.schema_adapter_factory().is_none()); - - // Add a schema adapter factory - let factory = Arc::new(TestSchemaAdapterFactory {}); - let source_with_adapter = source.with_schema_adapter_factory(factory); - assert!(source_with_adapter.schema_adapter_factory().is_some()); - - // Create a schema adapter - let adapter_factory = source_with_adapter.schema_adapter_factory().unwrap(); - let adapter = - adapter_factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)); - - // Test mapping column index - assert_eq!(adapter.map_column_index(0, &file_schema), Some(0)); - assert_eq!(adapter.map_column_index(1, &file_schema), Some(1)); - - // Test creating schema mapper - let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(projection, vec![0, 1]); -} diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 2fdf34b3cc..3254f48bab 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -28,7 +28,6 @@ use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::impl_schema_adapter_methods; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -127,7 +126,19 @@ impl FileSource for AvroSource { Ok(None) } - impl_schema_adapter_methods!(); + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } } mod private { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index d45080dc20..3af1f2b345 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -29,8 +29,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ - as_file_source, calculate_range, impl_schema_adapter_methods, FileRange, - ListingTableUrl, RangeCalculation, + as_file_source, calculate_range, FileRange, ListingTableUrl, RangeCalculation, }; use arrow::csv; @@ -284,7 +283,19 @@ impl FileSource for CsvSource { } } - impl_schema_adapter_methods!(); + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } } impl FileOpener for CsvOpener { diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 187876522e..af37e1033e 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -32,8 +32,7 @@ use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ - as_file_source, calculate_range, impl_schema_adapter_methods, ListingTableUrl, - RangeCalculation, + as_file_source, calculate_range, ListingTableUrl, RangeCalculation, }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -151,7 +150,20 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } - impl_schema_adapter_methods!(); + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } } impl FileOpener for JsonOpener { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 851e336443..647fbc8d05 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -423,7 +423,7 @@ impl FileFormat for ParquetFormat { source = source.with_metadata_size_hint(metadata_size_hint) } // Apply schema adapter factory before building the new config - let file_source = source.apply_schema_adapter(&conf); + let file_source = source.apply_schema_adapter(&conf)?; let conf = FileScanConfigBuilder::from(conf) .with_source(file_source) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 30b774d08f..0412288d68 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -29,7 +29,6 @@ use crate::ParquetFileReaderFactory; use datafusion_common::config::ConfigOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::impl_schema_adapter_methods; use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; @@ -415,7 +414,10 @@ impl ParquetSource { /// * `conf` - FileScanConfig that may contain a schema adapter factory /// # Returns /// The converted FileSource with schema adapter factory applied if provided - pub fn apply_schema_adapter(self, conf: &FileScanConfig) -> Arc<dyn FileSource> { + pub fn apply_schema_adapter( + self, + conf: &FileScanConfig, + ) -> datafusion_common::Result<Arc<dyn FileSource>> { let file_source: Arc<dyn FileSource> = self.into(); // If the FileScanConfig.file_source() has a schema adapter factory, apply it @@ -424,7 +426,7 @@ impl ParquetSource { Arc::<dyn SchemaAdapterFactory>::clone(&factory), ) } else { - file_source + Ok(file_source) } } } @@ -664,5 +666,18 @@ impl FileSource for ParquetSource { ); Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source)) } - impl_schema_adapter_methods!(); + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> datafusion_common::Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } } diff --git a/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs index 89406fb742..955cd224e6 100644 --- a/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs +++ b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs @@ -141,7 +141,7 @@ mod parquet_adapter_tests { prefix: "test_".to_string(), }); - let file_source = source.clone().with_schema_adapter_factory(factory); + let file_source = source.clone().with_schema_adapter_factory(factory).unwrap(); let config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), @@ -151,7 +151,7 @@ mod parquet_adapter_tests { .build(); // Apply schema adapter to a new source - let result_source = source.apply_schema_adapter(&config); + let result_source = source.apply_schema_adapter(&config).unwrap(); // Verify the adapter was applied assert!(result_source.schema_adapter_factory().is_some()); @@ -198,7 +198,7 @@ mod parquet_adapter_tests { .build(); // Apply schema adapter function - should pass through the source unchanged - let result_source = source.apply_schema_adapter(&config); + let result_source = source.apply_schema_adapter(&config).unwrap(); // Verify no adapter was applied assert!(result_source.schema_adapter_factory().is_none()); diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index d0557e9f08..c5f21ebf1a 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -28,7 +28,7 @@ use crate::file_stream::FileOpener; use crate::schema_adapter::SchemaAdapterFactory; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::{Result, Statistics}; +use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -126,19 +126,26 @@ pub trait FileSource: Send + Sync { /// Set optional schema adapter factory. /// /// [`SchemaAdapterFactory`] allows user to specify how fields from the - /// file get mapped to that of the table schema. The default implementation - /// returns the original source. + /// file get mapped to that of the table schema. If you implement this + /// method, you should also implement [`schema_adapter_factory`]. /// - /// Note: You can implement this method and `schema_adapter_factory` - /// automatically using the [`crate::impl_schema_adapter_methods`] macro. + /// The default implementation returns a not implemented error. + /// + /// [`schema_adapter_factory`]: Self::schema_adapter_factory fn with_schema_adapter_factory( &self, - factory: Arc<dyn SchemaAdapterFactory>, - ) -> Arc<dyn FileSource>; + _factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + not_impl_err!( + "FileSource {} does not support schema adapter factory", + self.file_type() + ) + } /// Returns the current schema adapter factory if set /// - /// Note: You can implement this method and `with_schema_adapter_factory` - /// automatically using the [`crate::impl_schema_adapter_methods`] macro. - fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>; + /// Default implementation returns `None`. + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + None + } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index bbc5efb4d0..b6cdd05e52 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -76,6 +76,7 @@ use log::{debug, warn}; /// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef}; /// # use object_store::ObjectStore; /// # use datafusion_common::Statistics; +/// # use datafusion_common::Result; /// # use datafusion_datasource::file::FileSource; /// # use datafusion_datasource::file_groups::FileGroup; /// # use datafusion_datasource::PartitionedFile; @@ -106,9 +107,9 @@ use log::{debug, warn}; /// # fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() } /// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } -/// # fn statistics(&self) -> datafusion_common::Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) } +/// # fn statistics(&self) -> Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) } /// # fn file_type(&self) -> &str { "parquet" } -/// # fn with_schema_adapter_factory(&self, factory: Arc<dyn SchemaAdapterFactory>) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} ) } +/// # fn with_schema_adapter_factory(&self, factory: Arc<dyn SchemaAdapterFactory>) -> Result<Arc<dyn FileSource>> { Ok(Arc::new(Self {projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} )) } /// # fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { self.schema_adapter_factory.clone() } /// # } /// # impl ParquetSource { diff --git a/datafusion/datasource/src/macros.rs b/datafusion/datasource/src/macros.rs deleted file mode 100644 index c7a4058f23..0000000000 --- a/datafusion/datasource/src/macros.rs +++ /dev/null @@ -1,145 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Macros for the datafusion-datasource crate - -/// Helper macro to generate schema adapter methods for FileSource implementations -/// -/// Place this inside *any* `impl FileSource for YourType { … }` to -/// avoid copy-pasting `with_schema_adapter_factory` and -/// `schema_adapter_factory`. -/// -/// # Availability -/// -/// This macro is exported at the crate root level via `#[macro_export]`, so it can be -/// imported directly from the crate: -/// -/// ```rust,no_run -/// use datafusion_datasource::impl_schema_adapter_methods; -/// ``` -/// -/// # Note on path resolution -/// When this macro is used: -/// - `$crate` expands to `datafusion_datasource` (the crate root) -/// - `$crate::file::FileSource` refers to the FileSource trait from this crate -/// - `$crate::schema_adapter::SchemaAdapterFactory` refers to the SchemaAdapterFactory trait -/// -/// # Example Usage -/// -/// ```rust,no_run -/// use std::sync::Arc; -/// use std::any::Any; -/// use std::fmt::{Formatter, Display, self}; -/// use arrow::datatypes::SchemaRef; -/// use datafusion_common::{Result, Statistics}; -/// use object_store::ObjectStore; -/// use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -/// use datafusion_physical_plan::DisplayFormatType; -/// use datafusion_physical_expr_common::sort_expr::LexOrdering; -/// use datafusion_datasource::file::FileSource; -/// use datafusion_datasource::file_stream::FileOpener; -/// use datafusion_datasource::file_scan_config::FileScanConfig; -/// use datafusion_datasource::impl_schema_adapter_methods; -/// use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -/// -/// #[derive(Clone)] -/// struct MyFileSource { -/// schema: SchemaRef, -/// batch_size: usize, -/// statistics: Statistics, -/// projection: Option<Vec<usize>>, -/// schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, -/// metrics: ExecutionPlanMetricsSet, -/// } -/// -/// impl FileSource for MyFileSource { -/// fn create_file_opener( -/// &self, -/// object_store: Arc<dyn ObjectStore>, -/// base_config: &FileScanConfig, -/// partition: usize, -/// ) -> Arc<dyn FileOpener> { -/// // Implementation here -/// unimplemented!() -/// } -/// -/// fn as_any(&self) -> &dyn Any { -/// self -/// } -/// -/// fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> { -/// let mut new_source = self.clone(); -/// new_source.batch_size = batch_size; -/// Arc::new(new_source) -/// } -/// -/// fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> { -/// let mut new_source = self.clone(); -/// new_source.schema = schema; -/// Arc::new(new_source) -/// } -/// -/// fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> { -/// let mut new_source = self.clone(); -/// new_source.projection = config.file_column_projection_indices(); -/// Arc::new(new_source) -/// } -/// -/// fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { -/// let mut new_source = self.clone(); -/// new_source.statistics = statistics; -/// Arc::new(new_source) -/// } -/// -/// fn metrics(&self) -> &ExecutionPlanMetricsSet { -/// &self.metrics -/// } -/// -/// fn statistics(&self) -> Result<Statistics> { -/// Ok(self.statistics.clone()) -/// } -/// -/// fn file_type(&self) -> &str { -/// "my_file_type" -/// } -/// -/// // Use the macro to implement schema adapter methods -/// impl_schema_adapter_methods!(); -/// } -/// ``` -#[macro_export(local_inner_macros)] -macro_rules! impl_schema_adapter_methods { - () => { - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: std::sync::Arc< - dyn $crate::schema_adapter::SchemaAdapterFactory, - >, - ) -> std::sync::Arc<dyn $crate::file::FileSource> { - std::sync::Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - ..self.clone() - }) - } - - fn schema_adapter_factory( - &self, - ) -> Option<std::sync::Arc<dyn $crate::schema_adapter::SchemaAdapterFactory>> { - self.schema_adapter_factory.clone() - } - }; -} diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 1c27cd4922..c79efd11fc 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -37,7 +37,6 @@ pub mod file_meta; pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; -pub mod macros; pub mod memory; pub mod schema_adapter; pub mod sink; diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index aac61c7812..e4a5114aa0 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -17,7 +17,7 @@ use crate::{ file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, - impl_schema_adapter_methods, schema_adapter::SchemaAdapterFactory, + schema_adapter::SchemaAdapterFactory, }; use std::sync::Arc; @@ -84,7 +84,19 @@ impl FileSource for MockSource { "mock" } - impl_schema_adapter_methods!(); + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Result<Arc<dyn FileSource>> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } } /// Create a column expression --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org