This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 900279c5d1 Implement schema adapter support for FileSource and add integration tests (#16148) 900279c5d1 is described below commit 900279c5d189a62a5585210c64ab6868af4640e5 Author: kosiew <kos...@gmail.com> AuthorDate: Fri May 30 19:45:58 2025 +0800 Implement schema adapter support for FileSource and add integration tests (#16148) * Implement schema adapter factory support for file sources * Add schema adapter factory support to file sources * Add SchemaAdapterFactory import to file source module * Add schema_adapter_factory field to JsonOpener and JsonSource structs * Add missing import for as_file_source in source.rs * Fix formatting in ArrowSource implementation by removing extra newlines * Add integration and unit tests for schema adapter factory functionality * fix tests * Refactor adapt method signature and improve test assertions for schema adapter factory * Simplify constructor in TestSource by removing redundant function definition * Remove redundant import of SchemaAdapterFactory in util.rs * fix tests: refactor schema_adapter_factory methods in TestSource for improved clarity * feat: add macro for schema adapter methods in FileSource implementation * feat: use macro implement schema adapter methods for various FileSource implementations * refactor: clean up unused schema adapter factory methods in ParquetSource * feat: add macro for generating schema adapter methods in FileSource implementations * refactor: re-export impl_schema_adapter_methods from crate root * refactor: update macro usage and documentation for schema adapter methods * refactor: clean up import statements in datasource module * refactor: reorganize and clean up import statements in util.rs * Resolve merge conflict * Export macro with local inner macros for improved encapsulation * fix clippy error * fix doc tests * fix CI error * Add metrics initialization to TestSource constructor * Add comment for test_multi_source_schema_adapter_reuse * reduce test files, move non-redundant tests, consolidate in one file * test_schema_adapter - add comments * remove redundant tests * Refactor schema adapter application to use ParquetSource method directly * Refactor apply_schema_adapter usage to call method directly on ParquetSource * remove macro * Revert "remove macro" This reverts commit 208b1cc996bfc77171ce5eaef50ca3fd296c9d40. * FileSource - provide default implementations for schema_adapter_factory methods * Revert "FileSource - provide default implementations for schema_adapter_factory methods" This reverts commit ee07b69fbcc0a41bd2a859f5f0fa328b35d4e69d. * Remove unused import of SchemaAdapterFactory from file_format.rs * Refactor imports in apply_schema_adapter_tests.rs for improved readability --- datafusion/core/src/datasource/mod.rs | 39 ++-- .../src/datasource/physical_plan/arrow_file.rs | 11 + .../schema_adapter_integration_tests.rs | 260 +++++++++++++++++++++ .../physical_optimizer/filter_pushdown/util.rs | 50 ++-- datafusion/core/tests/test_adapter_updated.rs | 214 +++++++++++++++++ datafusion/datasource-avro/src/source.rs | 5 + datafusion/datasource-csv/src/source.rs | 13 +- datafusion/datasource-json/src/source.rs | 14 +- datafusion/datasource-parquet/src/file_format.rs | 4 +- datafusion/datasource-parquet/src/source.rs | 49 ++-- .../tests/apply_schema_adapter_tests.rs | 206 ++++++++++++++++ datafusion/datasource/src/file.rs | 25 ++ datafusion/datasource/src/file_scan_config.rs | 12 +- datafusion/datasource/src/macros.rs | 145 ++++++++++++ datafusion/datasource/src/mod.rs | 2 + datafusion/datasource/src/test_util.rs | 4 + 16 files changed, 975 insertions(+), 78 deletions(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 674541ff73..f0c6771515 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -51,28 +51,26 @@ pub use datafusion_physical_expr::create_ordering; #[cfg(all(test, feature = "parquet"))] mod tests { - use crate::prelude::SessionContext; - - use std::fs; - use std::sync::Arc; - - use arrow::array::{Int32Array, StringArray}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use arrow::record_batch::RecordBatch; - use datafusion_common::test_util::batches_to_sort_string; - use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; - use datafusion_datasource::PartitionedFile; - use datafusion_datasource_parquet::source::ParquetSource; - - use datafusion_common::record_batch; - use ::object_store::path::Path; - use ::object_store::ObjectMeta; - use datafusion_datasource::source::DataSourceExec; + use crate::prelude::SessionContext; + use arrow::{ + array::{Int32Array, StringArray}, + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, + }; + use datafusion_common::{record_batch, test_util::batches_to_sort_string}; + use datafusion_datasource::{ + file::FileSource, file_scan_config::FileScanConfigBuilder, + source::DataSourceExec, PartitionedFile, + }; + use datafusion_datasource_parquet::source::ParquetSource; + use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::collect; + use object_store::{path::Path, ObjectMeta}; + use std::{fs, sync::Arc}; use tempfile::TempDir; #[tokio::test] @@ -81,7 +79,6 @@ mod tests { // record batches returned from parquet. This can be useful for schema evolution // where older files may not have all columns. - use datafusion_execution::object_store::ObjectStoreUrl; let tmp_dir = TempDir::new().unwrap(); let table_dir = tmp_dir.path().join("parquet_test"); fs::DirBuilder::new().create(table_dir.as_path()).unwrap(); @@ -124,10 +121,8 @@ mod tests { let f2 = Field::new("extra_column", DataType::Utf8, true); let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); - let source = Arc::new( - ParquetSource::default() - .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})), - ); + let source = ParquetSource::default() + .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); let base_conf = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), schema, diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 897d1c0447..6de72aa8ff 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use crate::error::Result; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::{as_file_source, impl_schema_adapter_methods}; use arrow::buffer::Buffer; use arrow::datatypes::SchemaRef; @@ -39,6 +41,13 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; pub struct ArrowSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option<Statistics>, + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, +} + +impl From<ArrowSource> for Arc<dyn FileSource> { + fn from(source: ArrowSource) -> Self { + as_file_source(source) + } } impl FileSource for ArrowSource { @@ -89,6 +98,8 @@ impl FileSource for ArrowSource { fn file_type(&self) -> &str { "arrow" } + + impl_schema_adapter_methods!(); } /// The struct arrow that implements `[FileOpener]` trait diff --git a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs new file mode 100644 index 0000000000..38c2ee582a --- /dev/null +++ b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration test for schema adapter factory functionality + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::arrow_file::ArrowSource; +use datafusion::prelude::*; +use datafusion_common::Result; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory}; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::PartitionedFile; +use std::sync::Arc; +use tempfile::TempDir; + +#[cfg(feature = "parquet")] +use datafusion_datasource_parquet::ParquetSource; +#[cfg(feature = "parquet")] +use parquet::arrow::ArrowWriter; +#[cfg(feature = "parquet")] +use parquet::file::properties::WriterProperties; + +#[cfg(feature = "csv")] +use datafusion_datasource_csv::CsvSource; + +/// A schema adapter factory that transforms column names to uppercase +#[derive(Debug)] +struct UppercaseAdapterFactory {} + +impl SchemaAdapterFactory for UppercaseAdapterFactory { + fn create(&self, schema: &Schema) -> Result<Box<dyn SchemaAdapter>> { + Ok(Box::new(UppercaseAdapter { + input_schema: Arc::new(schema.clone()), + })) + } +} + +/// Schema adapter that transforms column names to uppercase +#[derive(Debug)] +struct UppercaseAdapter { + input_schema: SchemaRef, +} + +impl SchemaAdapter for UppercaseAdapter { + fn adapt(&self, record_batch: RecordBatch) -> Result<RecordBatch> { + // In a real adapter, we might transform the data too + // For this test, we're just passing through the batch + Ok(record_batch) + } + + fn output_schema(&self) -> SchemaRef { + let fields = self + .input_schema + .fields() + .iter() + .map(|f| { + Field::new( + f.name().to_uppercase().as_str(), + f.data_type().clone(), + f.is_nullable(), + ) + }) + .collect(); + + Arc::new(Schema::new(fields)) + } +} + +#[cfg(feature = "parquet")] +#[tokio::test] +async fn test_parquet_integration_with_schema_adapter() -> Result<()> { + // Create a temporary directory for our test file + let tmp_dir = TempDir::new()?; + let file_path = tmp_dir.path().join("test.parquet"); + let file_path_str = file_path.to_str().unwrap(); + + // Create test data + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + // Write test parquet file + let file = std::fs::File::create(file_path_str)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?; + writer.write(&batch)?; + writer.close()?; + + // Create a session context + let ctx = SessionContext::new(); + + // Create a ParquetSource with the adapter factory + let source = ParquetSource::default() + .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {})); + + // Create a scan config + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?, + schema.clone(), + ) + .with_source(source) + .build(); + + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + + // There should be one batch + assert_eq!(batches.len(), 1); + + // Verify the schema has uppercase column names + let result_schema = batches[0].schema(); + assert_eq!(result_schema.field(0).name(), "ID"); + assert_eq!(result_schema.field(1).name(), "NAME"); + + Ok(()) +} + +#[tokio::test] +async fn test_multi_source_schema_adapter_reuse() -> Result<()> { + // This test verifies that the same schema adapter factory can be reused + // across different file source types. This is important for ensuring that: + // 1. The schema adapter factory interface works uniformly across all source types + // 2. The factory can be shared and cloned efficiently using Arc + // 3. Various data source implementations correctly implement the schema adapter factory pattern + + // Create a test factory + let factory = Arc::new(UppercaseAdapterFactory {}); + + // Apply the same adapter to different source types + let arrow_source = + ArrowSource::default().with_schema_adapter_factory(factory.clone()); + + #[cfg(feature = "parquet")] + let parquet_source = + ParquetSource::default().with_schema_adapter_factory(factory.clone()); + + #[cfg(feature = "csv")] + let csv_source = CsvSource::default().with_schema_adapter_factory(factory.clone()); + + // Verify adapters were properly set + assert!(arrow_source.schema_adapter_factory().is_some()); + + #[cfg(feature = "parquet")] + assert!(parquet_source.schema_adapter_factory().is_some()); + + #[cfg(feature = "csv")] + assert!(csv_source.schema_adapter_factory().is_some()); + + Ok(()) +} + +// Helper function to test From<T> for Arc<dyn FileSource> implementations +fn test_from_impl<T: Into<Arc<dyn FileSource>> + Default>(expected_file_type: &str) { + let source = T::default(); + let file_source: Arc<dyn FileSource> = source.into(); + assert_eq!(file_source.file_type(), expected_file_type); +} + +#[test] +fn test_from_implementations() { + // Test From implementation for various sources + test_from_impl::<ArrowSource>("arrow"); + + #[cfg(feature = "parquet")] + test_from_impl::<ParquetSource>("parquet"); + + #[cfg(feature = "csv")] + test_from_impl::<CsvSource>("csv"); + + #[cfg(feature = "json")] + test_from_impl::<datafusion_datasource_json::JsonSource>("json"); +} + +/// A simple test schema adapter factory that doesn't modify the schema +#[derive(Debug)] +struct TestSchemaAdapterFactory {} + +impl SchemaAdapterFactory for TestSchemaAdapterFactory { + fn create(&self, schema: &Schema) -> Result<Box<dyn SchemaAdapter>> { + Ok(Box::new(TestSchemaAdapter { + input_schema: Arc::new(schema.clone()), + })) + } +} + +/// A test schema adapter that passes through data unmodified +#[derive(Debug)] +struct TestSchemaAdapter { + input_schema: SchemaRef, +} + +impl SchemaAdapter for TestSchemaAdapter { + fn adapt(&self, record_batch: RecordBatch) -> Result<RecordBatch> { + // Just pass through the batch unmodified + Ok(record_batch) + } + + fn output_schema(&self) -> SchemaRef { + self.input_schema.clone() + } +} + +#[cfg(feature = "parquet")] +#[test] +fn test_schema_adapter_preservation() { + // Create a test schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + // Create source with schema adapter factory + let source = ParquetSource::default(); + let factory = Arc::new(TestSchemaAdapterFactory {}); + let file_source = source.with_schema_adapter_factory(factory); + + // Create a FileScanConfig with the source + let config_builder = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema.clone()) + .with_source(file_source.clone()) + // Add a file to make it valid + .with_file(PartitionedFile::new("test.parquet", 100)); + + let config = config_builder.build(); + + // Verify the schema adapter factory is present in the file source + assert!(config.source().schema_adapter_factory().is_some()); +} diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 393322a7f3..dc4d77194c 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -15,50 +15,41 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - fmt::{Display, Formatter}, -}; -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; -use datafusion_common::{config::ConfigOptions, Statistics}; -use datafusion_common::{internal_err, Result}; -use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_datasource::file_stream::FileOpenFuture; -use datafusion_datasource::source::DataSourceExec; +use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics}; use datafusion_datasource::{ - file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, -}; -use datafusion_datasource::{ - file_meta::FileMeta, schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile, + file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig, + file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, + file_stream::FileOpener, impl_schema_adapter_methods, + schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory, + source::DataSourceExec, PartitionedFile, }; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::{ - displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, -}; -use datafusion_physical_plan::{ + displayable, filter::FilterExec, filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPropagation, PredicateSupport, PredicateSupports, }, - DisplayAs, PlanProperties, + metrics::ExecutionPlanMetricsSet, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; - use futures::stream::BoxStream; use futures::{FutureExt, Stream}; use object_store::ObjectStore; - +use std::{ + any::Any, + fmt::{Display, Formatter}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; pub struct TestOpener { batches: Vec<RecordBatch>, batch_size: Option<usize>, @@ -119,19 +110,16 @@ pub struct TestSource { schema: Option<SchemaRef>, metrics: ExecutionPlanMetricsSet, projection: Option<Vec<usize>>, + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, } impl TestSource { fn new(support: bool, batches: Vec<RecordBatch>) -> Self { Self { support, - predicate: None, - statistics: None, - batch_size: None, - schema: None, - projection: None, metrics: ExecutionPlanMetricsSet::new(), batches, + ..Default::default() } } } @@ -243,6 +231,8 @@ impl FileSource for TestSource { Ok(FilterPushdownPropagation::unsupported(filters)) } } + + impl_schema_adapter_methods!(); } #[derive(Debug, Clone)] diff --git a/datafusion/core/tests/test_adapter_updated.rs b/datafusion/core/tests/test_adapter_updated.rs new file mode 100644 index 0000000000..c85b9a3447 --- /dev/null +++ b/datafusion/core/tests/test_adapter_updated.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics}; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::schema_adapter::{ + SchemaAdapter, SchemaAdapterFactory, SchemaMapper, +}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use object_store::ObjectStore; +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +/// A test source for testing schema adapters +#[derive(Debug, Clone)] +struct TestSource { + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, +} + +impl TestSource { + fn new() -> Self { + Self { + schema_adapter_factory: None, + } + } +} + +impl FileSource for TestSource { + fn file_type(&self) -> &str { + "test" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn create_file_opener( + &self, + _store: Arc<dyn ObjectStore>, + _conf: &FileScanConfig, + _index: usize, + ) -> Arc<dyn FileOpener> { + unimplemented!("Not needed for this test") + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn with_projection(&self, _projection: &FileScanConfig) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn with_statistics(&self, _statistics: Statistics) -> Arc<dyn FileSource> { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + unimplemented!("Not needed for this test") + } + + fn statistics(&self) -> Result<Statistics, DataFusionError> { + Ok(Statistics::default()) + } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Arc<dyn FileSource> { + Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + }) + } + + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } +} + +/// A test schema adapter factory +#[derive(Debug)] +struct TestSchemaAdapterFactory {} + +impl SchemaAdapterFactory for TestSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box<dyn SchemaAdapter> { + Box::new(TestSchemaAdapter { + table_schema: projected_table_schema, + }) + } +} + +/// A test schema adapter implementation +#[derive(Debug)] +struct TestSchemaAdapter { + table_schema: SchemaRef, +} + +impl SchemaAdapter for TestSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> { + let field = self.table_schema.field(index); + file_schema.fields.find(field.name()).map(|(i, _)| i) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + for (file_idx, file_field) in file_schema.fields().iter().enumerate() { + if self.table_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + Ok((Arc::new(TestSchemaMapping {}), projection)) + } +} + +/// A test schema mapper implementation +#[derive(Debug)] +struct TestSchemaMapping {} + +impl SchemaMapper for TestSchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> { + // For testing, just return the original batch + Ok(batch) + } + + fn map_column_statistics( + &self, + stats: &[ColumnStatistics], + ) -> Result<Vec<ColumnStatistics>> { + // For testing, just return the input statistics + Ok(stats.to_vec()) + } +} + +#[test] +fn test_schema_adapter() { + // This test verifies the functionality of the SchemaAdapter and SchemaAdapterFactory + // components used in DataFusion's file sources. + // + // The test specifically checks: + // 1. Creating and attaching a schema adapter factory to a file source + // 2. Creating a schema adapter using the factory + // 3. The schema adapter's ability to map column indices between a table schema and a file schema + // 4. The schema adapter's ability to create a projection that selects only the columns + // from the file schema that are present in the table schema + // + // Schema adapters are used when the schema of data in files doesn't exactly match + // the schema expected by the query engine, allowing for field mapping and data transformation. + + // Create a test schema + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + // Create a file schema + let file_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + Field::new("extra", DataType::Int64, true), + ]); + + // Create a TestSource + let source = TestSource::new(); + assert!(source.schema_adapter_factory().is_none()); + + // Add a schema adapter factory + let factory = Arc::new(TestSchemaAdapterFactory {}); + let source_with_adapter = source.with_schema_adapter_factory(factory); + assert!(source_with_adapter.schema_adapter_factory().is_some()); + + // Create a schema adapter + let adapter_factory = source_with_adapter.schema_adapter_factory().unwrap(); + let adapter = + adapter_factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)); + + // Test mapping column index + assert_eq!(adapter.map_column_index(0, &file_schema), Some(0)); + assert_eq!(adapter.map_column_index(1, &file_schema), Some(1)); + + // Test creating schema mapper + let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap(); + assert_eq!(projection, vec![0, 1]); +} diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index abd049a50e..2fdf34b3cc 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -28,6 +28,8 @@ use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::impl_schema_adapter_methods; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -41,6 +43,7 @@ pub struct AvroSource { projection: Option<Vec<String>>, metrics: ExecutionPlanMetricsSet, projected_statistics: Option<Statistics>, + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, } impl AvroSource { @@ -123,6 +126,8 @@ impl FileSource for AvroSource { ) -> Result<Option<FileScanConfig>> { Ok(None) } + + impl_schema_adapter_methods!(); } mod private { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index cbadb5dd91..d45080dc20 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -17,6 +17,7 @@ //! Execution plan for reading CSV files +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use std::any::Any; use std::fmt; use std::io::{Read, Seek, SeekFrom}; @@ -28,7 +29,8 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ - calculate_range, FileRange, ListingTableUrl, RangeCalculation, + as_file_source, calculate_range, impl_schema_adapter_methods, FileRange, + ListingTableUrl, RangeCalculation, }; use arrow::csv; @@ -91,6 +93,7 @@ pub struct CsvSource { comment: Option<u8>, metrics: ExecutionPlanMetricsSet, projected_statistics: Option<Statistics>, + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, } impl CsvSource { @@ -212,6 +215,12 @@ impl CsvOpener { } } +impl From<CsvSource> for Arc<dyn FileSource> { + fn from(source: CsvSource) -> Self { + as_file_source(source) + } +} + impl FileSource for CsvSource { fn create_file_opener( &self, @@ -274,6 +283,8 @@ impl FileSource for CsvSource { DisplayFormatType::TreeRender => Ok(()), } } + + impl_schema_adapter_methods!(); } impl FileOpener for CsvOpener { diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 982b799556..187876522e 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -30,7 +30,11 @@ use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::{calculate_range, ListingTableUrl, RangeCalculation}; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::{ + as_file_source, calculate_range, impl_schema_adapter_methods, ListingTableUrl, + RangeCalculation, +}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use arrow::json::ReaderBuilder; @@ -77,6 +81,7 @@ pub struct JsonSource { batch_size: Option<usize>, metrics: ExecutionPlanMetricsSet, projected_statistics: Option<Statistics>, + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, } impl JsonSource { @@ -86,6 +91,12 @@ impl JsonSource { } } +impl From<JsonSource> for Arc<dyn FileSource> { + fn from(source: JsonSource) -> Self { + as_file_source(source) + } +} + impl FileSource for JsonSource { fn create_file_opener( &self, @@ -140,6 +151,7 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } + impl_schema_adapter_methods!(); } impl FileOpener for JsonOpener { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 253bd8872d..851e336443 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -422,9 +422,11 @@ impl FileFormat for ParquetFormat { if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } + // Apply schema adapter factory before building the new config + let file_source = source.apply_schema_adapter(&conf); let conf = FileScanConfigBuilder::from(conf) - .with_source(Arc::new(source)) + .with_source(file_source) .build(); Ok(DataSourceExec::from_data_source(conf)) } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 69347f440c..30b774d08f 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -27,7 +27,9 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; use datafusion_common::config::ConfigOptions; +use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::impl_schema_adapter_methods; use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; @@ -49,7 +51,6 @@ use datafusion_physical_plan::DisplayFormatType; use itertools::Itertools; use object_store::ObjectStore; - /// Execution plan for reading one or more Parquet files. /// /// ```text @@ -343,25 +344,6 @@ impl ParquetSource { self } - /// return the optional schema adapter factory - pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> { - self.schema_adapter_factory.as_ref() - } - - /// Set optional schema adapter factory. - /// - /// [`SchemaAdapterFactory`] allows user to specify how fields from the - /// parquet file get mapped to that of the table schema. The default schema - /// adapter uses arrow's cast library to map the parquet fields to the table - /// schema. - pub fn with_schema_adapter_factory( - mut self, - schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, - ) -> Self { - self.schema_adapter_factory = Some(schema_adapter_factory); - self - } - /// If true, the predicate will be used during the parquet scan. /// Defaults to false /// @@ -426,6 +408,25 @@ impl ParquetSource { fn bloom_filter_on_read(&self) -> bool { self.table_parquet_options.global.bloom_filter_on_read } + + /// Applies schema adapter factory from the FileScanConfig if present. + /// + /// # Arguments + /// * `conf` - FileScanConfig that may contain a schema adapter factory + /// # Returns + /// The converted FileSource with schema adapter factory applied if provided + pub fn apply_schema_adapter(self, conf: &FileScanConfig) -> Arc<dyn FileSource> { + let file_source: Arc<dyn FileSource> = self.into(); + + // If the FileScanConfig.file_source() has a schema adapter factory, apply it + if let Some(factory) = conf.file_source().schema_adapter_factory() { + file_source.with_schema_adapter_factory( + Arc::<dyn SchemaAdapterFactory>::clone(&factory), + ) + } else { + file_source + } + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit @@ -446,6 +447,13 @@ pub(crate) fn parse_coerce_int96_string( } } +/// Allows easy conversion from ParquetSource to Arc<dyn FileSource> +impl From<ParquetSource> for Arc<dyn FileSource> { + fn from(source: ParquetSource) -> Self { + as_file_source(source) + } +} + impl FileSource for ParquetSource { fn create_file_opener( &self, @@ -656,4 +664,5 @@ impl FileSource for ParquetSource { ); Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source)) } + impl_schema_adapter_methods!(); } diff --git a/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs new file mode 100644 index 0000000000..89406fb742 --- /dev/null +++ b/datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod parquet_adapter_tests { + use arrow::{ + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, + }; + use datafusion_common::{ColumnStatistics, DataFusionError, Result}; + use datafusion_datasource::{ + file::FileSource, + file_scan_config::FileScanConfigBuilder, + schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}, + }; + use datafusion_datasource_parquet::source::ParquetSource; + use datafusion_execution::object_store::ObjectStoreUrl; + use std::{fmt::Debug, sync::Arc}; + + /// A test schema adapter factory that adds prefix to column names + #[derive(Debug)] + struct PrefixAdapterFactory { + prefix: String, + } + + impl SchemaAdapterFactory for PrefixAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box<dyn SchemaAdapter> { + Box::new(PrefixAdapter { + input_schema: projected_table_schema, + prefix: self.prefix.clone(), + }) + } + } + + /// A test schema adapter that adds prefix to column names + #[derive(Debug)] + struct PrefixAdapter { + input_schema: SchemaRef, + prefix: String, + } + + impl SchemaAdapter for PrefixAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> { + let field = self.input_schema.field(index); + file_schema.fields.find(field.name()).map(|(i, _)| i) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + for (file_idx, file_field) in file_schema.fields().iter().enumerate() { + if self.input_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + // Create a schema mapper that adds a prefix to column names + #[derive(Debug)] + struct PrefixSchemaMapping { + // Keep only the prefix field which is actually used in the implementation + prefix: String, + } + + impl SchemaMapper for PrefixSchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> { + // Create a new schema with prefixed field names + let prefixed_fields: Vec<Field> = batch + .schema() + .fields() + .iter() + .map(|field| { + Field::new( + format!("{}{}", self.prefix, field.name()), + field.data_type().clone(), + field.is_nullable(), + ) + }) + .collect(); + let prefixed_schema = Arc::new(Schema::new(prefixed_fields)); + + // Create a new batch with the prefixed schema but the same data + let options = arrow::record_batch::RecordBatchOptions::default(); + RecordBatch::try_new_with_options( + prefixed_schema, + batch.columns().to_vec(), + &options, + ) + .map_err(|e| DataFusionError::ArrowError(e, None)) + } + + fn map_column_statistics( + &self, + stats: &[ColumnStatistics], + ) -> Result<Vec<ColumnStatistics>> { + // For testing, just return the input statistics + Ok(stats.to_vec()) + } + } + + Ok(( + Arc::new(PrefixSchemaMapping { + prefix: self.prefix.clone(), + }), + projection, + )) + } + } + + #[test] + fn test_apply_schema_adapter_with_factory() { + // Create a schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + // Create a parquet source + let source = ParquetSource::default(); + + // Create a file scan config with source that has a schema adapter factory + let factory = Arc::new(PrefixAdapterFactory { + prefix: "test_".to_string(), + }); + + let file_source = source.clone().with_schema_adapter_factory(factory); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema.clone(), + file_source, + ) + .build(); + + // Apply schema adapter to a new source + let result_source = source.apply_schema_adapter(&config); + + // Verify the adapter was applied + assert!(result_source.schema_adapter_factory().is_some()); + + // Create adapter and test it produces expected schema + let adapter_factory = result_source.schema_adapter_factory().unwrap(); + let adapter = adapter_factory.create(schema.clone(), schema.clone()); + + // Create a dummy batch to test the schema mapping + let dummy_batch = RecordBatch::new_empty(schema.clone()); + + // Get the file schema (which is the same as the table schema in this test) + let (mapper, _) = adapter.map_schema(&schema).unwrap(); + + // Apply the mapping to get the output schema + let mapped_batch = mapper.map_batch(dummy_batch).unwrap(); + let output_schema = mapped_batch.schema(); + + // Check the column names have the prefix + assert_eq!(output_schema.field(0).name(), "test_id"); + assert_eq!(output_schema.field(1).name(), "test_name"); + } + + #[test] + fn test_apply_schema_adapter_without_factory() { + // Create a schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + + // Create a parquet source + let source = ParquetSource::default(); + + // Convert to Arc<dyn FileSource> + let file_source: Arc<dyn FileSource> = Arc::new(source.clone()); + + // Create a file scan config without a schema adapter factory + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema.clone(), + file_source, + ) + .build(); + + // Apply schema adapter function - should pass through the source unchanged + let result_source = source.apply_schema_adapter(&config); + + // Verify no adapter was applied + assert!(result_source.schema_adapter_factory().is_none()); + } +} diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index c9b5c416f0..d0557e9f08 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; +use crate::schema_adapter::SchemaAdapterFactory; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, Statistics}; @@ -35,6 +36,11 @@ use datafusion_physical_plan::DisplayFormatType; use object_store::ObjectStore; +/// Helper function to convert any type implementing FileSource to Arc<dyn FileSource> +pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> { + Arc::new(source) +} + /// file format specific behaviors for elements in [`DataSource`] /// /// See more details on specific implementations: @@ -116,4 +122,23 @@ pub trait FileSource: Send + Sync { ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> { Ok(FilterPushdownPropagation::unsupported(filters)) } + + /// Set optional schema adapter factory. + /// + /// [`SchemaAdapterFactory`] allows user to specify how fields from the + /// file get mapped to that of the table schema. The default implementation + /// returns the original source. + /// + /// Note: You can implement this method and `schema_adapter_factory` + /// automatically using the [`crate::impl_schema_adapter_methods`] macro. + fn with_schema_adapter_factory( + &self, + factory: Arc<dyn SchemaAdapterFactory>, + ) -> Arc<dyn FileSource>; + + /// Returns the current schema adapter factory if set + /// + /// Note: You can implement this method and `with_schema_adapter_factory` + /// automatically using the [`crate::impl_schema_adapter_methods`] macro. + fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>; } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index f93792b7fa..39431f3e9f 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -24,6 +24,8 @@ use std::{ }; use crate::file_groups::FileGroup; +#[allow(unused_imports)] +use crate::schema_adapter::SchemaAdapterFactory; use crate::{ display::FileGroupsDisplay, file::FileSource, @@ -83,6 +85,7 @@ use log::{debug, warn}; /// # use datafusion_execution::object_store::ObjectStoreUrl; /// # use datafusion_physical_plan::ExecutionPlan; /// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +/// # use datafusion_datasource::schema_adapter::SchemaAdapterFactory; /// # let file_schema = Arc::new(Schema::new(vec![ /// # Field::new("c1", DataType::Int32, false), /// # Field::new("c2", DataType::Int32, false), @@ -92,7 +95,8 @@ use log::{debug, warn}; /// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate /// #[derive(Clone)] /// # struct ParquetSource { -/// # projected_statistics: Option<Statistics> +/// # projected_statistics: Option<Statistics>, +/// # schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>> /// # }; /// # impl FileSource for ParquetSource { /// # fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() } @@ -100,13 +104,15 @@ use log::{debug, warn}; /// # fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() } /// # fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> } /// # fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() } -/// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) } +/// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } /// # fn statistics(&self) -> datafusion_common::Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) } /// # fn file_type(&self) -> &str { "parquet" } +/// # fn with_schema_adapter_factory(&self, factory: Arc<dyn SchemaAdapterFactory>) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} ) } +/// # fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> { self.schema_adapter_factory.clone() } /// # } /// # impl ParquetSource { -/// # fn new() -> Self { Self {projected_statistics: None} } +/// # fn new() -> Self { Self {projected_statistics: None, schema_adapter_factory: None} } /// # } /// // create FileScan config for reading parquet files from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); diff --git a/datafusion/datasource/src/macros.rs b/datafusion/datasource/src/macros.rs new file mode 100644 index 0000000000..c7a4058f23 --- /dev/null +++ b/datafusion/datasource/src/macros.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Macros for the datafusion-datasource crate + +/// Helper macro to generate schema adapter methods for FileSource implementations +/// +/// Place this inside *any* `impl FileSource for YourType { … }` to +/// avoid copy-pasting `with_schema_adapter_factory` and +/// `schema_adapter_factory`. +/// +/// # Availability +/// +/// This macro is exported at the crate root level via `#[macro_export]`, so it can be +/// imported directly from the crate: +/// +/// ```rust,no_run +/// use datafusion_datasource::impl_schema_adapter_methods; +/// ``` +/// +/// # Note on path resolution +/// When this macro is used: +/// - `$crate` expands to `datafusion_datasource` (the crate root) +/// - `$crate::file::FileSource` refers to the FileSource trait from this crate +/// - `$crate::schema_adapter::SchemaAdapterFactory` refers to the SchemaAdapterFactory trait +/// +/// # Example Usage +/// +/// ```rust,no_run +/// use std::sync::Arc; +/// use std::any::Any; +/// use std::fmt::{Formatter, Display, self}; +/// use arrow::datatypes::SchemaRef; +/// use datafusion_common::{Result, Statistics}; +/// use object_store::ObjectStore; +/// use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +/// use datafusion_physical_plan::DisplayFormatType; +/// use datafusion_physical_expr_common::sort_expr::LexOrdering; +/// use datafusion_datasource::file::FileSource; +/// use datafusion_datasource::file_stream::FileOpener; +/// use datafusion_datasource::file_scan_config::FileScanConfig; +/// use datafusion_datasource::impl_schema_adapter_methods; +/// use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +/// +/// #[derive(Clone)] +/// struct MyFileSource { +/// schema: SchemaRef, +/// batch_size: usize, +/// statistics: Statistics, +/// projection: Option<Vec<usize>>, +/// schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, +/// metrics: ExecutionPlanMetricsSet, +/// } +/// +/// impl FileSource for MyFileSource { +/// fn create_file_opener( +/// &self, +/// object_store: Arc<dyn ObjectStore>, +/// base_config: &FileScanConfig, +/// partition: usize, +/// ) -> Arc<dyn FileOpener> { +/// // Implementation here +/// unimplemented!() +/// } +/// +/// fn as_any(&self) -> &dyn Any { +/// self +/// } +/// +/// fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> { +/// let mut new_source = self.clone(); +/// new_source.batch_size = batch_size; +/// Arc::new(new_source) +/// } +/// +/// fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> { +/// let mut new_source = self.clone(); +/// new_source.schema = schema; +/// Arc::new(new_source) +/// } +/// +/// fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> { +/// let mut new_source = self.clone(); +/// new_source.projection = config.file_column_projection_indices(); +/// Arc::new(new_source) +/// } +/// +/// fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { +/// let mut new_source = self.clone(); +/// new_source.statistics = statistics; +/// Arc::new(new_source) +/// } +/// +/// fn metrics(&self) -> &ExecutionPlanMetricsSet { +/// &self.metrics +/// } +/// +/// fn statistics(&self) -> Result<Statistics> { +/// Ok(self.statistics.clone()) +/// } +/// +/// fn file_type(&self) -> &str { +/// "my_file_type" +/// } +/// +/// // Use the macro to implement schema adapter methods +/// impl_schema_adapter_methods!(); +/// } +/// ``` +#[macro_export(local_inner_macros)] +macro_rules! impl_schema_adapter_methods { + () => { + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: std::sync::Arc< + dyn $crate::schema_adapter::SchemaAdapterFactory, + >, + ) -> std::sync::Arc<dyn $crate::file::FileSource> { + std::sync::Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + }) + } + + fn schema_adapter_factory( + &self, + ) -> Option<std::sync::Arc<dyn $crate::schema_adapter::SchemaAdapterFactory>> { + self.schema_adapter_factory.clone() + } + }; +} diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 9e83adc6b9..1c27cd4922 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -37,6 +37,7 @@ pub mod file_meta; pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; +pub mod macros; pub mod memory; pub mod schema_adapter; pub mod sink; @@ -48,6 +49,7 @@ pub mod test_util; pub mod url; pub mod write; +pub use self::file::as_file_source; pub use self::url::ListingTableUrl; use crate::file_groups::FileGroup; use chrono::TimeZone; diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index e75c7c3a3d..aac61c7812 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -17,6 +17,7 @@ use crate::{ file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, + impl_schema_adapter_methods, schema_adapter::SchemaAdapterFactory, }; use std::sync::Arc; @@ -32,6 +33,7 @@ use object_store::ObjectStore; pub(crate) struct MockSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option<Statistics>, + schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, } impl FileSource for MockSource { @@ -81,6 +83,8 @@ impl FileSource for MockSource { fn file_type(&self) -> &str { "mock" } + + impl_schema_adapter_methods!(); } /// Create a column expression --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org